aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java154
1 files changed, 142 insertions, 12 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index d98e8d3a..c90a8064 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +30,16 @@ import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -40,19 +51,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -63,27 +83,134 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
private final Gson gson;
private final ClientErrorReasonPresenter clientErrorReasonPresenter;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
-
+ private static Properties props;
+ private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS";
+ private static Consumer<String, String> consumer;
+
public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
- ClientErrorReasonPresenter clientErrorReasonPresenter) {
+ ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception {
this.httpClient = httpClient;
this.gson = gson;
this.clientErrorReasonPresenter = clientErrorReasonPresenter;
+ setProperties();
+ }
+
+ /**
+ * New constructor that does not take DMaaP parameters as arguments.
+ *
+ * @throws Exception
+ */
+ public MessageRouterSubscriberImpl() throws Exception {
+ this.httpClient = null;
+ this.gson = null;
+ this.clientErrorReasonPresenter = null;
+ setProperties();
}
@Override
public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
- LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
- return httpClient.call(buildGetHttpRequest(request))
- .map(this::buildGetResponse)
- .doOnError(ReadTimeoutException.class,
- e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
- .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
- .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
- .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+ LOGGER.info("Requesting new items from DMaaP MR: {}", request);
+ String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl());
+
+ String fakeGroupName = request.consumerGroup();
+ props.put("client.id", request.consumerId());
+ props.put("group.id", fakeGroupName);
+
+ try{
+ if (consumer == null) {
+ if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) {
+ LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic);
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason("404 Topic Not Found")
+ .build());
+ }
+ consumer = getKafkaConsumer(props);
+ consumer.subscribe(Arrays.asList(topic));
+ }
+ ArrayList<String> msgs = new ArrayList<>();
+
+ ConsumerRecords<String, String> records = null;
+ synchronized (consumer) {
+ records = consumer.poll(Duration.ofMillis(500));
+ }
+ for (ConsumerRecord<String, String> rec : records) {
+ msgs.add(rec.value());
+ }
+ List<JsonElement> list = List.ofAll(msgs).map(r -> JsonParser.parseString(r));
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .items(list)
+ .build());
+ } catch(Exception e) {
+ LOGGER.error("Error while consuming the messages : {}",e.getMessage());
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason(e.getMessage())
+ .build());
+ }
}
-
+
+ @Override
+ public void setConsumer(Consumer<String, String> consumer) {
+ MessageRouterSubscriberImpl.consumer = consumer;
+ }
+
+ public static KafkaConsumer<String, String> getKafkaConsumer(Properties props){
+ return new KafkaConsumer<>(props);
+ }
+
+ @Override
+ public void close(){
+ if(consumer != null) {
+ LOGGER.info("Closing the Kafka Consumer");
+ consumer.close();
+ consumer = null;
+ }
+ Commons.closeKafkaAdminClient();
+ }
+
+ void setProperties() throws Exception {
+ props = setKafkaPropertiesFromSystemEnv(System.getenv());
+
+ if(System.getenv(kafkaBootstrapServers) == null) {
+ LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ }else {
+ props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+ }
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
+
+ if(System.getenv("JAAS_CONFIG") == null) {
+ LOGGER.info("Not using any authentication for kafka interaction");
+ }else {
+ LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+ }
+ }
+
+// @Override
+// public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
+// LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
+// return httpClient.call(buildGetHttpRequest(request))
+// .map(this::buildGetResponse)
+// .doOnError(ReadTimeoutException.class,
+// e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+// .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+// .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+// .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+// .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+// }
+ public static Properties getAdminProps() {
+ Properties adminProps = new Properties();
+ adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+ if(System.getenv("JAAS_CONFIG") != null) {
+ adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+ adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+ adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+ }
+ return adminProps;
+ }
+
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
return ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
@@ -132,4 +259,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
.map(HashMap::of)
.getOrElse(HashMap.empty());
}
+
+
+
}