From f394594ec70aaf1eefa4f23b80226c3426dbc17a Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Fri, 14 Sep 2018 15:49:10 +0200 Subject: Deliver first version of Datafile Change-Id: Iadd1455d7fe45b4c022dd7fde2f8a506d1b7cd57 Issue-ID: DCAEGEN2-640 Signed-off-by: elinuxhenrik --- .../config/DmaapConsumerConfiguration.java | 21 ++- .../datafile/config/DmaapCustomConfig.java | 10 +- .../config/DmaapPublisherConfiguration.java | 17 +-- .../datafile/service/DMaaPReactiveWebClient.java | 88 ------------ .../datafile/service/DmaapReactiveWebClient.java | 86 ++++++++++++ .../collectors/datafile/service/HttpUtils.java | 30 ++++ .../consumer/DMaaPConsumerReactiveHttpClient.java | 99 -------------- .../consumer/DmaapConsumerReactiveHttpClient.java | 99 ++++++++++++++ .../producer/DMaaPProducerReactiveHttpClient.java | 96 ------------- .../producer/DmaapProducerReactiveHttpClient.java | 151 +++++++++++++++++++++ 10 files changed, 385 insertions(+), 312 deletions(-) delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java (limited to 'datafile-dmaap-client/src/main') diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java index 57b11127..dd7519f9 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,14 +13,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; import org.springframework.stereotype.Component; /** @@ -43,20 +40,20 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig { public abstract String consumerGroup(); @Value.Parameter - public abstract Integer timeoutMs(); + public abstract Integer timeoutMS(); @Value.Parameter public abstract Integer messageLimit(); - public interface Builder extends - DmaapCustomConfig.Builder { + public interface Builder + extends DmaapCustomConfig.Builder { Builder consumerId(String consumerId); Builder consumerGroup(String consumerGroup); - Builder timeoutMs(Integer timeoutMs); + Builder timeoutMS(Integer timeoutMS); Builder messageLimit(Integer messageLimit); } @@ -65,4 +62,4 @@ public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig { return ImmutableDmaapConsumerConfiguration.builder(); } -} \ No newline at end of file +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java index 31bbfc0e..0b1d99eb 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java index cd520569..d0918446 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,16 +13,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.config; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; -import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; /** * @author Przemysław Wąsala on 3/23/18 @@ -36,8 +31,8 @@ public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig { private static final long serialVersionUID = 1L; - interface Builder extends - DmaapCustomConfig.Builder { + interface Builder + extends DmaapCustomConfig.Builder { } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java deleted file mode 100644 index b4cbfeea..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; - -import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; -import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.ExchangeFilterFunction; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -/** - * @author Przemysław Wąsala on 7/4/18 - */ -public class DMaaPReactiveWebClient { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private String dmaaPContentType; - private String dmaaPUserName; - private String dmaaPUserPassword; - - /** - * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. - * - * @param dmaapCustomConfig - configuration object - * @return DMaaPReactiveWebClient - */ - public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { - this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); - this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); - this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); - return this; - } - - /** - * Construct Reactive WebClient with appropriate settings. - * - * @return WebClient - */ - public WebClient build() { - return WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) - .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword)) - .filter(logRequest()) - .filter(logResponse()) - .build(); - } - - private ExchangeFilterFunction logResponse() { - return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { - logger.info("Response Status {}", clientResponse.statusCode()); - return Mono.just(clientResponse); - }); - } - - private ExchangeFilterFunction logRequest() { - return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { - logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); - clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); - return Mono.just(clientRequest); - }); - } - -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java new file mode 100644 index 00000000..d5878b0d --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 7/4/18 + */ +public class DmaapReactiveWebClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String dmaaPContentType; + private String dmaaPUserName; + private String dmaaPUserPassword; + + /** + * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. + * + * @param dmaapCustomConfig - configuration object + * @return DmaapReactiveWebClient + */ + public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { + this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); + this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); + this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); + return this; + } + + /** + * Construct Reactive WebClient with appropriate settings. + * + * @return WebClient + */ + public WebClient build() { + return WebClient.builder() + .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) + .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword)) + .filter(logRequest()) + .filter(logResponse()) + .build(); + } + + private ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response Status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } + +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java new file mode 100644 index 00000000..2b44233f --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import org.apache.http.HttpStatus; + +public final class HttpUtils implements HttpStatus { + + private HttpUtils() {} + + public static boolean isSuccessfulResponseCode(Integer statusCode) { + return statusCode >= 200 && statusCode < 300; + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java deleted file mode 100644 index 1fcebeac..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.service.consumer; - -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.http.client.utils.URIBuilder; -import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -/** - * @author Przemysław Wąsala on 6/26/18 - */ -public class DMaaPConsumerReactiveHttpClient { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private WebClient webClient; - private final String dmaapHostName; - private final String dmaapProtocol; - private final Integer dmaapPortNumber; - private final String dmaapTopicName; - private final String consumerGroup; - private final String consumerId; - - /** - * Constructor of DMaaPConsumerReactiveHttpClient. - * - * @param consumerConfiguration - DMaaP consumer configuration object - */ - public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { - this.dmaapHostName = consumerConfiguration.dmaapHostName(); - this.dmaapProtocol = consumerConfiguration.dmaapProtocol(); - this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber(); - this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); - this.consumerGroup = consumerConfiguration.consumerGroup(); - this.consumerId = consumerConfiguration.consumerId(); - } - - /** - * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic. - * - * @return reactive response from DMaaP in string format - */ - public Mono getDMaaPConsumerResponse() { - try { - return webClient - .get() - .uri(getUri()) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI "); - return Mono.error(e); - } - } - - private String createRequestPath() { - return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; - } - - public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } - - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(createRequestPath()).build(); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java new file mode 100644 index 00000000..ad9e6fe7 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.consumer; + +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 6/26/18 + * @author Henrik Andersson + */ +public class DmaapConsumerReactiveHttpClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final String dmaapProtocol; + private final Integer dmaapPortNumber; + private final String dmaapTopicName; + private final String consumerGroup; + private final String consumerId; + + /** + * Constructor of DmaapConsumerReactiveHttpClient. + * + * @param consumerConfiguration - DMaaP consumer configuration object + */ + public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { + this.dmaapHostName = consumerConfiguration.dmaapHostName(); + this.dmaapProtocol = consumerConfiguration.dmaapProtocol(); + this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber(); + this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); + this.consumerGroup = consumerConfiguration.consumerGroup(); + this.consumerId = consumerConfiguration.consumerId(); + } + + /** + * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic. + * + * @return reactive response from DMaaP in string format + */ + public Mono getDmaapConsumerResponse() { + try { + return webClient + .get() + .uri(getUri()) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> + Mono.error(new Exception("HTTP 400")) + ) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> + Mono.error(new Exception("HTTP 500"))) + .bodyToMono(String.class); + } catch (URISyntaxException e) { + logger.error("Unable to parse URI in message from xNF.", e); + return Mono.error(e); + } + } + + private String createRequestPath() { + return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; + } + + public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; + } + + URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(createRequestPath()).build(); + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java deleted file mode 100644 index c6889df4..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.service.producer; - -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.http.client.utils.URIBuilder; -import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -/** - * @author Przemysław Wąsala on 7/4/18 - */ -public class DMaaPProducerReactiveHttpClient { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private WebClient webClient; - private final String dmaapHostName; - private final Integer dmaapPortNumber; - private final String dmaapProtocol; - private final String dmaapTopicName; - - /** - * Constructor DMaaPProducerReactiveHttpClient. - * - * @param dmaapPublisherConfiguration - DMaaP producer configuration object - */ - public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { - this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); - this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); - this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); - this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); - } - - /** - * Function for calling DMaaP HTTP producer - post request to DMaaP. - * - * @param consumerDmaapModelMono - object which will be sent to DMaaP - * @return status code of operation - */ - public Mono getDMaaPProducerResponse(Mono consumerDmaapModelMono) { - try { - return webClient - .post() - .uri(getUri()) - .body(BodyInserters.fromObject(consumerDmaapModelMono)) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) - ) - .onStatus(HttpStatus::is5xxServerError, clientResponse -> - Mono.error(new Exception("HTTP 500"))) - .bodyToMono(String.class); - } catch (URISyntaxException e) { - logger.warn("Exception while evaluating URI"); - return Mono.error(e); - } - } - - public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } - - URI getUri() throws URISyntaxException { - return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) - .setPath(dmaapTopicName).build(); - } - -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java new file mode 100644 index 00000000..8010bdc1 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -0,0 +1,151 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.http.HttpHeaders; +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; + +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 7/4/18 + * @author Henrik Andersson + */ +public class DmaapProducerReactiveHttpClient { + + private static final String X_ATT_DR_META = "X-ATT-DR-META"; + private static final String LOCATION = "location"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final Integer dmaapPortNumber; + private final String dmaapProtocol; + private final String dmaapTopicName; + private final String dmaapContentType; + + /** + * Constructor DmaapProducerReactiveHttpClient. + * + * @param dmaapPublisherConfiguration - DMaaP producer configuration object + */ + public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + + this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); + this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); + this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); + this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); + } + + /** + * Function for calling DMaaP HTTP producer - post request to DMaaP. + * + * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @return status code of operation + */ + public Mono getDmaapProducerResponse(Mono> consumerDmaapModelMono) { + consumerDmaapModelMono.subscribe(models -> postFilesAndData(models)); + return Mono.just(HttpStatus.OK.toString()); + } + + public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; + } + + private void postFilesAndData(List models) { + for (ConsumerDmaapModel consumerDmaapModel : models) { + postFileAndData(consumerDmaapModel); + } + } + + private void postFileAndData(ConsumerDmaapModel model) { + RequestBodyUriSpec post = webClient.post(); + + boolean headPrepared = prepareHead(model, post); + + if (headPrepared) { + prepareBody(model, post); + + ResponseSpec responseSpec = post.retrieve(); + responseSpec.onStatus(HttpStatus::is4xxClientError, + clientResponse -> handlePostErrors(model, clientResponse)); + responseSpec.onStatus(HttpStatus::is5xxServerError, + clientResponse -> handlePostErrors(model, clientResponse)); + String bodyToMono = responseSpec.bodyToMono(String.class).block(); + } + } + + private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { + boolean result = true; + try { + post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); + + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + metaData.getAsJsonObject().remove(LOCATION); + post.header(X_ATT_DR_META, metaData.toString()); + + post.uri(getUri()); + } catch (Exception e) { + logger.error("Unable to post file to Data Router. " + model, e); + result = false; + } + + return result; + } + + private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { + String fileLocation = model.getLocation(); + File fileResource = new File(fileLocation); + FileSystemResource httpResource = new FileSystemResource(fileResource); + post.body(BodyInserters.fromResource(httpResource)); + } + + private URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(dmaapTopicName).build(); + } + + private Mono handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { + String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString(); + logger.error(errorMessage); + + return Mono.error(new Exception(errorMessage)); + } +} -- cgit 1.2.3-korg