aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapConsumerConfiguration.java66
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapCustomConfig.java95
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapPublisherConfiguration.java44
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/ConsumerReactiveHttpClientFactory.java43
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java104
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java103
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DMaaPPublisherReactiveHttpClient.java102
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DmaaPRestTemplateFactory.java115
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/PublisherReactiveHttpClientFactory.java39
9 files changed, 711 insertions, 0 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapConsumerConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapConsumerConfiguration.java
new file mode 100644
index 00000000..f7107f7c
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapConsumerConfiguration.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
+ */
+@Component
+@Value.Immutable(prehash = true)
+@Value.Style(builder = "new")
+@Gson.TypeAdapters
+public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ public static DmaapConsumerConfiguration.Builder builder() {
+ return ImmutableDmaapConsumerConfiguration.builder();
+ }
+
+ @Value.Parameter
+ public abstract String consumerId();
+
+ @Value.Parameter
+ public abstract String consumerGroup();
+
+ @Value.Parameter
+ public abstract Integer timeoutMs();
+
+ @Value.Parameter
+ public abstract Integer messageLimit();
+
+ public interface Builder extends
+ DmaapCustomConfig.Builder<DmaapConsumerConfiguration, DmaapConsumerConfiguration.Builder> {
+
+ Builder consumerId(String consumerId);
+
+ Builder consumerGroup(String consumerGroup);
+
+ Builder timeoutMs(Integer timeoutMs);
+
+ Builder messageLimit(Integer messageLimit);
+ }
+
+} \ No newline at end of file
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapCustomConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapCustomConfig.java
new file mode 100644
index 00000000..efeaa85f
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapCustomConfig.java
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config;
+
+import java.io.Serializable;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/28/18
+ */
+public interface DmaapCustomConfig extends Serializable {
+
+ @Value.Parameter
+ String dmaapHostName();
+
+ @Value.Parameter
+ Integer dmaapPortNumber();
+
+ @Value.Parameter
+ String dmaapTopicName();
+
+ @Value.Parameter
+ String dmaapProtocol();
+
+ @Value.Parameter
+ String dmaapUserName();
+
+ @Value.Parameter
+ String dmaapUserPassword();
+
+ @Value.Parameter
+ String dmaapContentType();
+
+ @Value.Parameter
+ String trustStorePath();
+
+ @Value.Parameter
+ String trustStorePasswordPath();
+
+ @Value.Parameter
+ String keyStorePath();
+
+ @Value.Parameter
+ String keyStorePasswordPath();
+
+ @Value.Parameter
+ Boolean enableDmaapCertAuth();
+
+ interface Builder<T extends DmaapCustomConfig, B extends Builder<T, B>> {
+
+ B dmaapHostName(String dmaapHostName);
+
+ B dmaapPortNumber(Integer dmaapPortNumber);
+
+ B dmaapTopicName(String dmaapTopicName);
+
+ B dmaapProtocol(String dmaapProtocol);
+
+ B dmaapUserName(String dmaapUserName);
+
+ B dmaapUserPassword(String dmaapUserPassword);
+
+ B dmaapContentType(String dmaapContentType);
+
+ B trustStorePath(String trustStorePath);
+
+ B trustStorePasswordPath(String trustStorePasswordPath);
+
+ B keyStorePath(String keyStore);
+
+ B keyStorePasswordPath(String keyStorePass);
+
+ B enableDmaapCertAuth(Boolean enableDmaapCertAuth);
+
+ T build();
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapPublisherConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapPublisherConfiguration.java
new file mode 100644
index 00000000..3866f9b4
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapPublisherConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
+ */
+@Value.Immutable(prehash = true)
+@Value.Style(builder = "new")
+@Gson.TypeAdapters
+public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ public static DmaapPublisherConfiguration.Builder builder() {
+ return ImmutableDmaapPublisherConfiguration.builder();
+ }
+
+ interface Builder extends
+ DmaapCustomConfig.Builder<DmaapPublisherConfiguration, DmaapPublisherConfiguration.Builder> {
+
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/ConsumerReactiveHttpClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/ConsumerReactiveHttpClientFactory.java
new file mode 100644
index 00000000..7fd1021f
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/ConsumerReactiveHttpClientFactory.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
+
+import javax.net.ssl.SSLException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ */
+public class ConsumerReactiveHttpClientFactory {
+
+ private final DMaaPReactiveWebClientFactory reactiveWebClientFactory;
+
+ public ConsumerReactiveHttpClientFactory(DMaaPReactiveWebClientFactory reactiveWebClientFactory) {
+ this.reactiveWebClientFactory = reactiveWebClientFactory;
+ }
+
+ public DMaaPConsumerReactiveHttpClient create(DmaapConsumerConfiguration consumerConfiguration)
+ throws SSLException {
+ return new DMaaPConsumerReactiveHttpClient(consumerConfiguration,
+ reactiveWebClientFactory.build(consumerConfiguration));
+ }
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
new file mode 100644
index 00000000..f0172704
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.net.URI;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ */
+public class DMaaPConsumerReactiveHttpClient {
+
+ private final String dmaapHostName;
+ private final String dmaapProtocol;
+ private final Integer dmaapPortNumber;
+ private final String dmaapTopicName;
+ private final String consumerGroup;
+ private final String consumerId;
+ private final String contentType;
+ private final WebClient webClient;
+
+ /**
+ * Constructor of DMaaPConsumerReactiveHttpClient.
+ *
+ * @param consumerConfiguration - DMaaP consumer configuration object
+ */
+ DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration, WebClient webClient) {
+ this.dmaapHostName = consumerConfiguration.dmaapHostName();
+ this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
+ this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
+ this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
+ this.consumerGroup = consumerConfiguration.consumerGroup();
+ this.consumerId = consumerConfiguration.consumerId();
+ this.contentType = consumerConfiguration.dmaapContentType();
+ this.webClient = webClient;
+ }
+
+ /**
+ * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic.
+ *
+ * @return reactive response from DMaaP in string format
+ */
+ public Mono<String> getDMaaPConsumerResponse() {
+ return webClient
+ .get()
+ .uri(getUri())
+ .headers(getHeaders())
+ .retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse ->
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+ )
+ .onStatus(HttpStatus::is5xxServerError, clientResponse ->
+ Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+ .bodyToMono(String.class);
+ }
+
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ httpHeaders.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
+ }
+
+ private String createRequestPath() {
+ return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
+ }
+
+
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(createRequestPath()).build();
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
new file mode 100644
index 00000000..b1f2ab02
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
+
+import io.netty.handler.ssl.SslContext;
+import javax.net.ssl.SSLException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.http.client.reactive.ClientHttpConnector;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DMaaPReactiveWebClientFactory {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final SslFactory sslFactory;
+
+ public DMaaPReactiveWebClientFactory() {
+ this(new SslFactory());
+ }
+
+ DMaaPReactiveWebClientFactory(SslFactory sslFactory) {
+ this.sslFactory = sslFactory;
+ }
+
+ /**
+ * Construct Reactive WebClient with appropriate settings.
+ *
+ * @return WebClient
+ */
+ public WebClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+ SslContext sslContext = createSslContext(consumerConfiguration);
+ ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
+ HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+ return WebClient.builder()
+ .clientConnector(reactorClientHttpConnector)
+ .filter(logRequest())
+ .filter(logResponse())
+ .build();
+ }
+
+ private SslContext createSslContext(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+ if (consumerConfiguration.enableDmaapCertAuth()) {
+ return sslFactory.createSecureContext(
+ consumerConfiguration.keyStorePath(), consumerConfiguration.keyStorePasswordPath(),
+ consumerConfiguration.trustStorePath(), consumerConfiguration.trustStorePasswordPath()
+ );
+ }
+ return sslFactory.createInsecureContext();
+ }
+
+ private ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+ logger.info("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+ logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ MDC.remove(SERVICE_NAME);
+ return Mono.just(clientRequest);
+ });
+ }
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DMaaPPublisherReactiveHttpClient.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DMaaPPublisherReactiveHttpClient.java
new file mode 100644
index 00000000..43a0b4cc
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DMaaPPublisherReactiveHttpClient.java
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.CommonFunctions.createJsonBody;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.net.URI;
+import java.util.UUID;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Mono;
+
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DMaaPPublisherReactiveHttpClient {
+
+ private final Logger logger = LoggerFactory.getLogger(DMaaPPublisherReactiveHttpClient.class);
+ private final String dmaapHostName;
+ private final Integer dmaapPortNumber;
+ private final String dmaapProtocol;
+ private final String dmaapTopicName;
+ private final String dmaapContentType;
+ private final Mono<RestTemplate> restTemplateMono;
+
+ /**
+ * Constructor DMaaPPublisherReactiveHttpClient.
+ *
+ * @param dmaapPublisherConfiguration - DMaaP producer configuration object
+ */
+ DMaaPPublisherReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration,
+ Mono<RestTemplate> restTemplateMono) {
+ this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
+ this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
+ this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
+ this.restTemplateMono = restTemplateMono;
+ }
+
+ /**
+ * Function for calling DMaaP HTTP producer - post request to DMaaP.
+ *
+ * @param consumerDmaapModelMono - object which will be sent to DMaaP
+ * @return status code of operation
+ */
+
+ public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+ return Mono.defer(() -> {
+ HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ logger.info("Request: {} {}", getUri(), request);
+ return restTemplateMono.map(
+ restTemplate -> restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+ });
+ }
+
+ private HttpHeaders getAllHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ return headers;
+
+ }
+
+ URI getUri() {
+ return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+ .path(dmaapTopicName).build();
+ }
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DmaaPRestTemplateFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DmaaPRestTemplateFactory.java
new file mode 100644
index 00000000..fe2b2c10
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DmaaPRestTemplateFactory.java
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import javax.net.ssl.SSLContext;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+import reactor.core.publisher.Mono;
+
+public class DmaaPRestTemplateFactory {
+
+ /**
+ * Function for creating RestTemplate object.
+ *
+ * @param publisherConfiguration - DMaaP publisher configuration object
+ * @return RestTemplate with correct ssl configuration
+ */
+ public Mono<RestTemplate> build(DmaapPublisherConfiguration publisherConfiguration) {
+ if (publisherConfiguration.enableDmaapCertAuth()) {
+ return createRestTemplateWithSslSetup(publisherConfiguration);
+ }
+
+ return Mono.just(new RestTemplate());
+ }
+
+ private Mono<RestTemplate> createRestTemplateWithSslSetup(DmaapPublisherConfiguration publisherConfiguration) {
+ try {
+ RestTemplateBuilder builder = new RestTemplateBuilder();
+
+ SSLContext sslContext = createSslContext(publisherConfiguration,
+ loadPasswordFromFile(publisherConfiguration.keyStorePasswordPath()),
+ loadPasswordFromFile(publisherConfiguration.trustStorePasswordPath()));
+
+ return Mono.just(builder
+ .requestFactory(() -> createRequestFactory(sslContext)).build());
+
+ } catch (GeneralSecurityException | IOException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private SSLContext createSslContext(DmaapPublisherConfiguration publisherConfiguration,
+ String keyStorePassword, String trustStorePassword)
+ throws IOException, GeneralSecurityException {
+ return new SSLContextBuilder()
+ .loadKeyMaterial(
+ keyStore(publisherConfiguration.keyStorePath(), keyStorePassword),
+ keyStorePassword.toCharArray())
+ .loadTrustMaterial(
+ getFile(publisherConfiguration.trustStorePath()), trustStorePassword.toCharArray())
+ .build();
+ }
+
+ private HttpComponentsClientHttpRequestFactory createRequestFactory(SSLContext sslContext) {
+ SSLConnectionSocketFactory socketFactory =
+ new SSLConnectionSocketFactory(sslContext);
+ HttpClient httpClient = HttpClients.custom()
+ .setSSLSocketFactory(socketFactory).build();
+
+ return new HttpComponentsClientHttpRequestFactory(httpClient);
+ }
+
+ private KeyStore keyStore(String keyStoreFile, String keyStorePassword)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = KeyStore.getInstance("jks");
+ ks.load(getResource(keyStoreFile), keyStorePassword.toCharArray());
+ return ks;
+ }
+
+ private File getFile(String fileName) {
+ return new File(fileName);
+ }
+
+ private InputStream getResource(String fileName) throws FileNotFoundException {
+ return new FileInputStream(fileName);
+ }
+
+ private String loadPasswordFromFile(String path) throws IOException {
+ return new String(Files.readAllBytes(Paths.get(path)));
+ }
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/PublisherReactiveHttpClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/PublisherReactiveHttpClientFactory.java
new file mode 100644
index 00000000..e5f3e19e
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/PublisherReactiveHttpClientFactory.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+
+public class PublisherReactiveHttpClientFactory {
+
+ private final DmaaPRestTemplateFactory restTemplateFactory;
+
+ public PublisherReactiveHttpClientFactory(DmaaPRestTemplateFactory restTemplateFactory) {
+ this.restTemplateFactory = restTemplateFactory;
+ }
+
+ public DMaaPPublisherReactiveHttpClient create(DmaapPublisherConfiguration publisherConfiguration) {
+ return new DMaaPPublisherReactiveHttpClient(publisherConfiguration,
+ restTemplateFactory.build(publisherConfiguration));
+ }
+
+}