/* * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END===================================== */ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.collection.Stream; import io.vavr.control.Option; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import java.net.ConnectException; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author Piotr Jaszczyk * @since March 2019 */ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final RxHttpClient httpClient; private final int maxBatchSize; private final Duration maxBatchDuration; private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static Properties props; private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); private static Producer kafkaProducer; public static Future future; static boolean flag; static Exception exception; public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.maxBatchSize = maxBatchSize; this.maxBatchDuration = maxBatchDuration; this.clientErrorReasonPresenter = clientErrorReasonPresenter; setProperties(); } /** * New constructor that does not take DMaaP parameters as arguments. * * @throws Exception */ public MessageRouterPublisherImpl() throws Exception { this.httpClient = null; this.maxBatchSize = 0; this.maxBatchDuration = null; this.clientErrorReasonPresenter = null; setProperties(); } // @Override // public Flux put( // MessageRouterPublishRequest request, // Flux items) { // return items.bufferTimeout(maxBatchSize, maxBatchDuration) // .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems))); // } @Override public Flux put( MessageRouterPublishRequest request, Flux items) { flag = true; exception=null; future = null; List batch = getBatch(items); String topic = getTopicFromTopicUrl(request.sinkDefinition().topicUrl()); LOGGER.info("Topic extracted from URL {} is : {} ",request.sinkDefinition().topicUrl(),topic); LOGGER.info("Sending a batch of {} items for topic {} to kafka", batch.size(),topic); LOGGER.trace("The items to be sent: {}", batch); if(kafkaProducer == null) { kafkaProducer = new KafkaProducer<>(props); } Flux response; try { for (String msg : batch) { ProducerRecord data = new ProducerRecord<>(topic, msg); future = kafkaProducer.send(data,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { flag=false; exception = e; } } }); } if(flag) { LOGGER.info("Sent a batch of {} items for topic {} to kafka", batch.size(),topic); response = Flux.just(ImmutableMessageRouterPublishResponse.builder().items(List.ofAll(items.collectList().block())).build()); }else { throw exception; } }catch(Exception e) { LOGGER.error("Error while publishing the messages : {}",e.getStackTrace()); response = Flux.just(ImmutableMessageRouterPublishResponse.builder() .failReason(e.getMessage()) .build()); } return response; } @Override public void close() { LOGGER.info("Closing the Kafka Producer"); if(kafkaProducer != null) { kafkaProducer.close(); kafkaProducer=null; } } @Override public void setKafkaProducer(Producer kafkaProducer) { this.kafkaProducer = kafkaProducer; } public static Future getFuture(){ return future; } void setProperties() throws Exception { props = setKafkaPropertiesFromSystemEnv(System.getenv()); if(System.getenv(kafkaBootstrapServers) == null) { LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); }else { props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); } if(System.getenv("JAAS_CONFIG") == null) { LOGGER.info("Not using any authentication for kafka interaction"); }else { LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); } } static List getBatch(Flux items){ java.util.List list = new ArrayList<>(); items.map(msg -> msg.toString()).collectList().subscribe(data -> list.addAll(data)); return List.ofAll(list); } private Publisher pushBatchToMr( MessageRouterPublishRequest request, List batch) { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)) .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) .onErrorResume(PoolAcquirePendingLimitException.class, e -> buildErrorResponse(ClientErrorReasons.CONNECTION_POLL_LIMIT)) .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch))); } private @NotNull RequestBody createBody(List subItems, ContentType contentType) { if (contentType == ContentType.APPLICATION_JSON) { final JsonArray elements = new JsonArray(subItems.size()); subItems.forEach(elements::add); return RequestBody.fromJson(elements); } else if (contentType == ContentType.TEXT_PLAIN) { String messages = subItems.map(JsonElement::toString) .collect(Collectors.joining("\n")); return RequestBody.fromString(messages); } else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { return ImmutableHttpRequest.builder() .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) .customHeaders(headers(request)) .body(body) .timeout(timeout(request).getOrNull()) .build(); } private MessageRouterPublishResponse buildResponse( HttpResponse httpResponse, List batch) { final ImmutableMessageRouterPublishResponse.Builder builder = ImmutableMessageRouterPublishResponse.builder(); return httpResponse.successful() ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); } private Mono buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) .build()); } private Option timeout(MessageRouterPublishRequest request) { return Option.of(request.timeoutConfig()) .map(DmaapTimeoutConfig::getTimeout); } private Map headers(MessageRouterPublishRequest request) { Map headers = Option.of(request.sinkDefinition().aafCredentials()) .map(Commons::basicAuthHeader) .map(HashMap::of) .getOrElse(HashMap.empty()); return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString()); } }