diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
2 files changed, 11 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 1f3dd6547..1ccf230d7 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.nio.charset.Charset +import java.util.UUID import java.util.concurrent.Phaser import javax.annotation.PreDestroy @@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer( launch { try { ph.register() - log.trace("Consumed Message : $message") - val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() + val key = message.key() ?: UUID.randomUUID().toString() + val value = String(message.value(), Charset.defaultCharset()) + log.trace("Consumed Message : key($key) value($value)") + val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(executionServiceOutput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index fca73981e..93885bfa2 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -71,9 +71,10 @@ class KafkaPublishAuditService( */ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) + val key = secureExecutionServiceInput.actionIdentifiers.blueprintName try { this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + this.inputInstance!!.sendMessage(key, secureExecutionServiceInput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : ${e.message}" @@ -89,9 +90,10 @@ class KafkaPublishAuditService( */ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { executionServiceOutput.correlationUUID = correlationUUID + val key = executionServiceOutput.actionIdentifiers.blueprintName try { this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + this.outputInstance!!.sendMessage(key, executionServiceOutput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : $e" |