/* * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END===================================== */ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.control.Option; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author Piotr Jaszczyk * @since March 2019 */ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { private final RxHttpClient httpClient; private final Gson gson; private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class); private static Properties props; private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; private static Consumer consumer; public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.gson = gson; this.clientErrorReasonPresenter = clientErrorReasonPresenter; setProperties(); } /** * New constructor that does not take DMaaP parameters as arguments. * * @throws Exception */ public MessageRouterSubscriberImpl() throws Exception { this.httpClient = null; this.gson = null; this.clientErrorReasonPresenter = null; setProperties(); } @Override public Mono get(MessageRouterSubscribeRequest request) { LOGGER.info("Requesting new items from DMaaP MR: {}", request); String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl()); String fakeGroupName = request.consumerGroup(); props.put("client.id", request.consumerId()); props.put("group.id", fakeGroupName); try{ if (consumer == null) { if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) { LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason("404 Topic Not Found") .build()); } consumer = getKafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); } ArrayList msgs = new ArrayList<>(); ConsumerRecords records = null; synchronized (consumer) { records = consumer.poll(Duration.ofMillis(500)); } for (ConsumerRecord rec : records) { msgs.add(rec.value()); } List list = List.ofAll(msgs).map(r -> JsonParser.parseString(r)); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .items(list) .build()); } catch(Exception e) { LOGGER.error("Error while consuming the messages : {}",e.getMessage()); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(e.getMessage()) .build()); } } @Override public void setConsumer(Consumer consumer) { MessageRouterSubscriberImpl.consumer = consumer; } public static KafkaConsumer getKafkaConsumer(Properties props){ return new KafkaConsumer<>(props); } @Override public void close(){ if(consumer != null) { LOGGER.info("Closing the Kafka Consumer"); consumer.close(); consumer = null; } Commons.closeKafkaAdminClient(); } void setProperties() throws Exception { props = setKafkaPropertiesFromSystemEnv(System.getenv()); if(System.getenv(kafkaBootstrapServers) == null) { LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); }else { props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); } props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false); if(System.getenv("JAAS_CONFIG") == null) { LOGGER.info("Not using any authentication for kafka interaction"); }else { LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); } } // @Override // public Mono get(MessageRouterSubscribeRequest request) { // LOGGER.debug("Requesting new items from DMaaP MR: {}", request); // return httpClient.call(buildGetHttpRequest(request)) // .map(this::buildGetResponse) // .doOnError(ReadTimeoutException.class, // e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) // .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) // .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) // .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) // .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); // } public static Properties getAdminProps() { Properties adminProps = new Properties(); adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); if(System.getenv("JAAS_CONFIG") != null) { adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); } return adminProps; } private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { return ImmutableHttpRequest.builder() .method(HttpMethod.GET) .url(buildSubscribeUrl(request)) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) .customHeaders(headers(request)) .timeout(timeout(request).getOrNull()) .build(); } private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) { final ImmutableMessageRouterSubscribeResponse.Builder builder = ImmutableMessageRouterSubscribeResponse.builder(); return httpResponse.successful() ? builder.items(getAsJsonElements(httpResponse)).build() : builder.failReason(extractFailReason(httpResponse)).build(); } private List getAsJsonElements(HttpResponse httpResponse) { JsonArray bodyAsJsonArray = httpResponse .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class); return List.ofAll(bodyAsJsonArray).map(arrayElement -> JsonParser.parseString(arrayElement.getAsString())); } private String buildSubscribeUrl(MessageRouterSubscribeRequest request) { return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(), request.consumerId()); } private Mono buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(failReason) .build()); } private Option timeout(MessageRouterSubscribeRequest request) { return Option.of(request.timeoutConfig()) .map(DmaapTimeoutConfig::getTimeout); } private Map headers(MessageRouterSubscribeRequest request) { return Option.of(request.sourceDefinition().aafCredentials()) .map(Commons::basicAuthHeader) .map(HashMap::of) .getOrElse(HashMap.empty()); } }