aboutsummaryrefslogtreecommitdiffstats
path: root/openecomp-ui/test/softwareProduct/creation
diff options
context:
space:
mode:
authorfranciscovila <javier.paradela.vila@est.tech>2022-05-09 17:31:27 +0100
committerFrancisco Javier Paradela Vila <javier.paradela.vila@est.tech>2022-05-09 16:33:55 +0000
commit42df714817c6f3a4d1ee7bdb650528b3f33acd79 (patch)
tree42be4061221e1332b1431dc47a364b5935681c8e /openecomp-ui/test/softwareProduct/creation
parent401f001b36be5508dd8c129430126e49e68d1b5b (diff)
VLM duplicate name gives generic error
Provide user with specific error message when name is alrady in use Issue-ID: SDC-3991 Signed-off-by: franciscovila <javier.paradela.vila@est.tech> Change-Id: I7d2c839d2ed14d17257adacfe2de6978ac0e61a4
Diffstat (limited to 'openecomp-ui/test/softwareProduct/creation')
0 files changed, 0 insertions, 0 deletions
9' href='#n39'>39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
/*
 *  Copyright © 2019 IBM.
 *  Modifications Copyright © 2021 Bell Canada.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api

import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.updateErrorMessage
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
import java.nio.charset.Charset
import java.util.UUID
import java.util.concurrent.Phaser
import javax.annotation.PreDestroy

@ConditionalOnProperty(
    name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
    havingValue = "true"
)
@Service
open class BlueprintProcessingKafkaConsumer(
    private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
    private val executionServiceHandler: ExecutionServiceHandler,
    private val meterRegistry: MeterRegistry
) {

    val log = logger(BlueprintProcessingKafkaConsumer::class)

    private val ph = Phaser(1)

    private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService

    companion object {

        const val CONSUMER_SELECTOR = "self-service-api"
        const val PRODUCER_SELECTOR = "self-service-api"
    }

    @EventListener(ApplicationReadyEvent::class)
    fun setupMessageListener() = GlobalScope.launch {
        try {
            log.info(
                "Setting up message consumer($CONSUMER_SELECTOR)" +
                    "message producer($PRODUCER_SELECTOR)..."
            )

            /** Get the Message Consumer Service **/
            blueprintMessageConsumerService = try {
                blueprintMessageLibPropertyService
                    .blueprintMessageConsumerService(CONSUMER_SELECTOR)
            } catch (e: BlueprintProcessorException) {
                val errorMsg = "Failed creating Kafka consumer message service."
                throw e.updateErrorMessage(
                    SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
                    "Wrong Kafka selector provided or internal error in Kafka service."
                )
            } catch (e: Exception) {
                throw BlueprintProcessorException("failed to create consumer service ${e.message}")
            }

            /** Get the Message Producer Service **/
            val blueprintMessageProducerService = try {
                blueprintMessageLibPropertyService
                    .blueprintMessageProducerService(PRODUCER_SELECTOR)
            } catch (e: BlueprintProcessorException) {
                val errorMsg = "Failed creating Kafka producer message service."
                throw e.updateErrorMessage(
                    SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
                    "Wrong Kafka selector provided or internal error in Kafka service."
                )
            } catch (e: Exception) {
                throw BlueprintProcessorException("failed to create producer service ${e.message}")
            }

            launch {
                /** Subscribe to the consumer topics */
                val additionalConfig: MutableMap<String, Any> = hashMapOf()
                val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
                channel.consumeEach { message ->
                    launch {
                        try {
                            ph.register()
                            val key = message.key() ?: UUID.randomUUID().toString()
                            val value = String(message.value(), Charset.defaultCharset())
                            val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
                            log.info(
                                "Consumed Message : topic(${message.topic()}) " +
                                    "partition(${message.partition()}) " +
                                    "leaderEpoch(${message.leaderEpoch().get()}) " +
                                    "offset(${message.offset()}) " +
                                    "key(${message.key()}) " +
                                    BlueprintMessageUtils.getMessageLogData(executionServiceInput)
                            )
                            val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
                            blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
                        } catch (e: Exception) {
                            meterRegistry.counter(
                                BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
                                BlueprintMessageUtils.kafkaMetricTag(message.topic())
                            ).increment()
                            log.error("failed in processing the consumed message : $message", e)
                        } finally {
                            ph.arriveAndDeregister()
                        }
                    }
                }
            }
        } catch (e: Exception) {
            log.error(
                "failed to start message consumer($CONSUMER_SELECTOR) " +
                    "message producer($PRODUCER_SELECTOR) ",
                e
            )
        }
    }

    @PreDestroy
    fun shutdownMessageListener() = runBlocking {
        try {
            log.info(
                "Shutting down message consumer($CONSUMER_SELECTOR)" +
                    "message producer($PRODUCER_SELECTOR)..."
            )
            blueprintMessageConsumerService.shutDown()
            ph.arriveAndAwaitAdvance()
        } catch (e: Exception) {
            log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
        }
    }
}