From d661dbcf431f0f02ecf98f748e3516ba0ab23dff Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Fri, 17 Aug 2018 12:34:58 +0200 Subject: Add seed code. First version based on PRH micro service. Change-Id: Iea1673a8a1961006b1ea98ef245e213e3652eb82 Issue-ID: DCAEGEN2-638 Signed-off-by: elinuxhenrik --- .../config/DmaapConsumerConfiguration.java | 68 +++++++++++++++ .../datafile/config/DmaapCustomConfig.java | 72 ++++++++++++++++ .../config/DmaapPublisherConfiguration.java | 47 ++++++++++ .../datafile/service/DMaaPReactiveWebClient.java | 88 +++++++++++++++++++ .../consumer/DMaaPConsumerReactiveHttpClient.java | 99 ++++++++++++++++++++++ .../producer/DMaaPProducerReactiveHttpClient.java | 96 +++++++++++++++++++++ 6 files changed, 470 insertions(+) create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java create mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java (limited to 'datafile-dmaap-client/src/main') diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java new file mode 100644 index 00000000..57b11127 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.config; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.springframework.stereotype.Component; + +/** + * @author Przemysław Wąsala on 3/23/18 + */ +@Component +@Value.Immutable(prehash = true) +@Value.Style(builder = "new") +@Gson.TypeAdapters +public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig { + + private static final long serialVersionUID = 1L; + + @Value.Parameter + public abstract String consumerId(); + + @Value.Parameter + public abstract String consumerGroup(); + + @Value.Parameter + public abstract Integer timeoutMs(); + + @Value.Parameter + public abstract Integer messageLimit(); + + + public interface Builder extends + DmaapCustomConfig.Builder { + + Builder consumerId(String consumerId); + + Builder consumerGroup(String consumerGroup); + + Builder timeoutMs(Integer timeoutMs); + + Builder messageLimit(Integer messageLimit); + } + + public static DmaapConsumerConfiguration.Builder builder() { + return ImmutableDmaapConsumerConfiguration.builder(); + } + +} \ No newline at end of file diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java new file mode 100644 index 00000000..31bbfc0e --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.config; + +import java.io.Serializable; + +import org.immutables.value.Value; + +/** + * @author Przemysław Wąsala on 3/28/18 + */ +public interface DmaapCustomConfig extends Serializable { + + @Value.Parameter + String dmaapHostName(); + + @Value.Parameter + Integer dmaapPortNumber(); + + @Value.Parameter + String dmaapTopicName(); + + @Value.Parameter + String dmaapProtocol(); + + @Value.Parameter + String dmaapUserName(); + + @Value.Parameter + String dmaapUserPassword(); + + @Value.Parameter + String dmaapContentType(); + + + interface Builder> { + + B dmaapHostName(String dmaapHostName); + + B dmaapPortNumber(Integer dmaapPortNumber); + + B dmaapTopicName(String dmaapTopicName); + + B dmaapProtocol(String dmaapProtocol); + + B dmaapUserName(String dmaapUserName); + + B dmaapUserPassword(String dmaapUserPassword); + + B dmaapContentType(String dmaapContentType); + + T build(); + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java new file mode 100644 index 00000000..cd520569 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.config; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; + +/** + * @author Przemysław Wąsala on 3/23/18 + */ +@Value.Immutable(prehash = true) +@Value.Style(builder = "new") +@Gson.TypeAdapters +public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig { + + private static final long serialVersionUID = 1L; + + interface Builder extends + DmaapCustomConfig.Builder { + + } + + public static DmaapPublisherConfiguration.Builder builder() { + return ImmutableDmaapPublisherConfiguration.builder(); + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java new file mode 100644 index 00000000..b4cbfeea --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DMaaPReactiveWebClient.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; +import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 7/4/18 + */ +public class DMaaPReactiveWebClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String dmaaPContentType; + private String dmaaPUserName; + private String dmaaPUserPassword; + + /** + * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig. + * + * @param dmaapCustomConfig - configuration object + * @return DMaaPReactiveWebClient + */ + public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { + this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); + this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); + this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); + return this; + } + + /** + * Construct Reactive WebClient with appropriate settings. + * + * @return WebClient + */ + public WebClient build() { + return WebClient.builder() + .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) + .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword)) + .filter(logRequest()) + .filter(logResponse()) + .build(); + } + + private ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response Status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } + +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java new file mode 100644 index 00000000..1fcebeac --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DMaaPConsumerReactiveHttpClient.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service.consumer; + +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 6/26/18 + */ +public class DMaaPConsumerReactiveHttpClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final String dmaapProtocol; + private final Integer dmaapPortNumber; + private final String dmaapTopicName; + private final String consumerGroup; + private final String consumerId; + + /** + * Constructor of DMaaPConsumerReactiveHttpClient. + * + * @param consumerConfiguration - DMaaP consumer configuration object + */ + public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) { + this.dmaapHostName = consumerConfiguration.dmaapHostName(); + this.dmaapProtocol = consumerConfiguration.dmaapProtocol(); + this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber(); + this.dmaapTopicName = consumerConfiguration.dmaapTopicName(); + this.consumerGroup = consumerConfiguration.consumerGroup(); + this.consumerId = consumerConfiguration.consumerId(); + } + + /** + * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic. + * + * @return reactive response from DMaaP in string format + */ + public Mono getDMaaPConsumerResponse() { + try { + return webClient + .get() + .uri(getUri()) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> + Mono.error(new Exception("HTTP 400")) + ) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> + Mono.error(new Exception("HTTP 500"))) + .bodyToMono(String.class); + } catch (URISyntaxException e) { + logger.warn("Exception while evaluating URI "); + return Mono.error(e); + } + } + + private String createRequestPath() { + return dmaapTopicName + "/" + consumerGroup + "/" + consumerId; + } + + public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { + this.webClient = webClient; + return this; + } + + URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(createRequestPath()).build(); + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java new file mode 100644 index 00000000..c6889df4 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DMaaPProducerReactiveHttpClient.java @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.http.client.utils.URIBuilder; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 7/4/18 + */ +public class DMaaPProducerReactiveHttpClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private WebClient webClient; + private final String dmaapHostName; + private final Integer dmaapPortNumber; + private final String dmaapProtocol; + private final String dmaapTopicName; + + /** + * Constructor DMaaPProducerReactiveHttpClient. + * + * @param dmaapPublisherConfiguration - DMaaP producer configuration object + */ + public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); + this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); + this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); + this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); + } + + /** + * Function for calling DMaaP HTTP producer - post request to DMaaP. + * + * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @return status code of operation + */ + public Mono getDMaaPProducerResponse(Mono consumerDmaapModelMono) { + try { + return webClient + .post() + .uri(getUri()) + .body(BodyInserters.fromObject(consumerDmaapModelMono)) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, clientResponse -> + Mono.error(new Exception("HTTP 400")) + ) + .onStatus(HttpStatus::is5xxServerError, clientResponse -> + Mono.error(new Exception("HTTP 500"))) + .bodyToMono(String.class); + } catch (URISyntaxException e) { + logger.warn("Exception while evaluating URI"); + return Mono.error(e); + } + } + + public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) { + this.webClient = webClient; + return this; + } + + URI getUri() throws URISyntaxException { + return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber) + .setPath(dmaapTopicName).build(); + } + +} -- cgit 1.2.3-korg