diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt')
-rw-r--r-- | ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt new file mode 100644 index 000000000..4d735d9e7 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -0,0 +1,157 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import io.micrometer.core.instrument.MeterRegistry +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.event.ApplicationReadyEvent +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import java.nio.charset.Charset +import java.util.UUID +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy + +@ConditionalOnProperty( + name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], + havingValue = "true" +) +@Service +open class BluePrintProcessingKafkaConsumer( + private val blueprintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler, + private val meterRegistry: MeterRegistry +) { + + val log = logger(BluePrintProcessingKafkaConsumer::class) + + private val ph = Phaser(1) + + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService + + companion object { + + const val CONSUMER_SELECTOR = "self-service-api" + const val PRODUCER_SELECTOR = "self-service-api" + } + + @EventListener(ApplicationReadyEvent::class) + fun setupMessageListener() = GlobalScope.launch { + try { + log.info( + "Setting up message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." + ) + + /** Get the Message Consumer Service **/ + blueprintMessageConsumerService = try { + blueprintMessageLibPropertyService + .blueprintMessageConsumerService(CONSUMER_SELECTOR) + } catch (e: BluePrintProcessorException) { + val errorMsg = "Failed creating Kafka consumer message service." + throw e.updateErrorMessage( + SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, + "Wrong Kafka selector provided or internal error in Kafka service." + ) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create consumer service ${e.message}") + } + + /** Get the Message Producer Service **/ + val blueprintMessageProducerService = try { + blueprintMessageLibPropertyService + .blueprintMessageProducerService(PRODUCER_SELECTOR) + } catch (e: BluePrintProcessorException) { + val errorMsg = "Failed creating Kafka producer message service." + throw e.updateErrorMessage( + SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, + "Wrong Kafka selector provided or internal error in Kafka service." + ) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + + launch { + /** Subscribe to the consumer topics */ + val additionalConfig: MutableMap<String, Any> = hashMapOf() + val channel = blueprintMessageConsumerService.subscribe(additionalConfig) + channel.consumeEach { message -> + launch { + try { + ph.register() + val key = message.key() ?: UUID.randomUUID().toString() + val value = String(message.value(), Charset.defaultCharset()) + val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() + log.info( + "Consumed Message : topic(${message.topic()}) " + + "partition(${message.partition()}) " + + "leaderEpoch(${message.leaderEpoch().get()}) " + + "offset(${message.offset()}) " + + "key(${message.key()}) " + + "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})" + ) + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) + } catch (e: Exception) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(message.topic()) + ).increment() + log.error("failed in processing the consumed message : $message", e) + } finally { + ph.arriveAndDeregister() + } + } + } + } + } catch (e: Exception) { + log.error( + "failed to start message consumer($CONSUMER_SELECTOR) " + + "message producer($PRODUCER_SELECTOR) ", + e + ) + } + } + + @PreDestroy + fun shutdownMessageListener() = runBlocking { + try { + log.info( + "Shutting down message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." + ) + blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() + } catch (e: Exception) { + log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) + } + } +} |