aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
blob: cccc61f40c9188450f0445762584757a0f970550 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
 *  Copyright © 2019 IBM.
 *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.onap.ccsdk.cds.blueprintsprocessor.message.service

import com.fasterxml.jackson.databind.node.ObjectNode
import io.micrometer.core.instrument.MeterRegistry
import org.apache.commons.lang3.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.slf4j.LoggerFactory
import java.nio.charset.Charset

class KafkaMessageProducerService(
    private val messageProducerProperties: MessageProducerProperties,
    private val meterRegistry: MeterRegistry
) :
    BlueprintMessageProducerService {

    private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!!

    private var kafkaProducer: KafkaProducer<String, ByteArray>? = null

    private val messageLoggerService = MessageLoggerService()

    companion object {

        const val MAX_ERR_MSG_LEN = 128
    }

    override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
        return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
    }

    override suspend fun sendMessageNB(
        key: String,
        topic: String,
        message: Any,
        headers: MutableMap<String, String>?
    ): Boolean {
        var clonedMessage = message
        if (clonedMessage is ExecutionServiceOutput) {
            clonedMessage = truncateResponse(clonedMessage)
        }

        val byteArrayMessage = when (clonedMessage) {
            is String -> clonedMessage.toByteArray(Charset.defaultCharset())
            else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
        }

        val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
        val recordHeaders = record.headers()
        messageLoggerService.messageProducing(recordHeaders)
        headers?.let {
            headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
        }
        val callback = Callback { metadata, exception ->
            meterRegistry.counter(
                BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
                BlueprintMessageUtils.kafkaMetricTag(topic)
            ).increment()
            if (exception != null) {
                meterRegistry.counter(
                    BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
                    BlueprintMessageUtils.kafkaMetricTag(topic)
                ).increment()
                log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception)
            } else {
                log.info(
                    "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
                        "partition(${metadata.partition()}) " +
                        "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}."
                )
            }
        }
        messageTemplate().send(record, callback)
        return true
    }

    fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
        log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
        val configProps = messageProducerProperties.getConfig()
        /** Add additional Properties */
        if (additionalConfig != null)
            configProps.putAll(additionalConfig)

        if (kafkaProducer == null)
            kafkaProducer = KafkaProducer(configProps)

        return kafkaProducer!!
    }

    /**
     * Truncation of BP responses
     */
    private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
        /** Truncation of error messages */
        var truncErrMsg = executionServiceOutput.status.errorMessage
        if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
            truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
                " [...]. Check Blueprint Processor logs for more information."
        }
        /** Truncation for Command Executor responses */
        var truncPayload = executionServiceOutput.payload.deepCopy()
        val workflowName = executionServiceOutput.actionIdentifiers.actionName
        if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
            var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
            cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
        }
        return ExecutionServiceOutput().apply {
            correlationUUID = executionServiceOutput.correlationUUID
            commonHeader = executionServiceOutput.commonHeader
            actionIdentifiers = executionServiceOutput.actionIdentifiers
            status = Status().apply {
                code = executionServiceOutput.status.code
                eventType = executionServiceOutput.status.eventType
                timestamp = executionServiceOutput.status.timestamp
                errorMessage = truncErrMsg
                message = executionServiceOutput.status.message
            }
            payload = truncPayload
            stepData = executionServiceOutput.stepData
        }
    }
}