From 44b1e918621effeecdb1b775ab9d3786e73bb699 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Fri, 15 Nov 2019 09:49:42 -0500 Subject: Message prioritization error handling Error handling for message processor and forward errors to output sink Optimize and expose message prioritization state service dependencies Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: Iee04811871de5306ba1f7e37d6e6c78f1a969181 --- .../prioritization/AbstractTopologyComponents.kt | 2 +- .../MessagePrioritizationConsumer.kt | 4 ++ .../prioritization/MessagePrioritizationData.kt | 3 +- .../prioritization/MessagePrioritizeExtensions.kt | 26 ++++++---- .../prioritization/api/MessagePrioritizationApi.kt | 55 ++++++++++++++++++++++ .../db/MessagePrioritizationRepositories.kt | 22 ++++++--- .../db/PrioritizationMessageEntity.kt | 4 ++ .../service/MessagePrioritizationStateService.kt | 46 +++++++++++------- .../topology/MessageAggregateProcessor.kt | 38 +++++++++++---- .../topology/MessagePrioritizeProcessor.kt | 20 ++++++-- .../prioritization/utils/MessageProcessorUtils.kt | 36 ++++++++++++++ 11 files changed, 206 insertions(+), 50 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt index d89f71364..d114da521 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt @@ -34,7 +34,7 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa this.processorContext = context /** Get the State service to update in store */ this.messagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) + .messagePrioritizationStateService() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ef9d5a058..967cc190e 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService @@ -74,9 +75,12 @@ open class MessagePrioritizationConsumer( Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + /** To receive completed and error messages */ topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, prioritizationConfiguration.outputTopic, Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, MessagePrioritizationConstants.PROCESSOR_OUTPUT) // Output will be sent to the group-output topic from Processor API diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index d874cef92..3358a5643 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -28,8 +28,8 @@ enum class MessageState(val id: String) { EXPIRED("expired"), PRIORITIZED("prioritized"), AGGREGATED("aggregated"), - IGNORED("ignored"), COMPLETED("completed"), + ERROR("error") } open class PrioritizationConfiguration : Serializable { @@ -59,7 +59,6 @@ open class UpdateStateRequest : Serializable { lateinit var id: String var group: String? = null var state: String? = null - var notifyMessage: String? = null } data class CorrelationCheckResponse(var message: String? = null, diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 94fedf4df..ec061ad47 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -16,21 +16,27 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) - : ProcessorSupplier { - return ProcessorSupplier { - // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance - } -} +/** + * Register the MessagePrioritizationStateService and exposed dependency + */ +fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = + instance(MessagePrioritizationStateService::class) +/** + * Expose messagePrioritizationStateService to AbstractComponentFunction + */ +fun AbstractComponentFunction.messagePrioritizationStateService() = + BluePrintDependencyService.messagePrioritizationStateService() + +/** + * MessagePrioritization correlation extensions + */ fun MessagePrioritization.toFormatedCorrelation(): String { val ascendingKey = this.correlationId!!.split(",") .map { it.trim() }.sorted().joinToString(",") diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt new file mode 100644 index 000000000..382cb9c8a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -0,0 +1,55 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc +import org.springframework.http.MediaType +import org.springframework.web.bind.annotation.* + +@RestController +@RequestMapping(value = ["/api/v1/message-prioritization"]) +open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) { + + @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun ping(): String = "Success" + + + @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc { + messagePrioritizationStateService.getMessage(id) + } + + @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { + messagePrioritizationStateService.saveMessage(messagePrioritization) + } + + @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE]) + fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = + monoMdc { + messagePrioritizationStateService.setMessageState(updateMessageState.id, + updateMessageState.state!!) + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt index 307d932a9..69c81079d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt @@ -71,19 +71,27 @@ interface PrioritizationMessageRepository : JpaRepository, state: String): Int + @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids") + fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int @Modifying @Transactional - @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " + - "WHERE pm.id = :id") - fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int + @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id") + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id") + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int @Modifying @Transactional diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt index 4973cdf6e..1825f91c2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt @@ -48,6 +48,10 @@ open class MessagePrioritization { @Column(name = "message", nullable = false) var message: String? = null + @Lob + @Column(name = "error", nullable = true) + var error: String? = null + @Lob @Column(name = "aggregated_message_ids", nullable = true) var aggregatedMessageIds: String? = null diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt index e4369fc20..8424226c2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -33,15 +33,20 @@ interface MessagePrioritizationStateService { suspend fun getMessage(id: String): MessagePrioritization + suspend fun getMessages(ids: List): List? + suspend fun getExpiryEligibleMessages(count: Int): List? - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): List? + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int) + : List? - suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): List? + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int) + : List? suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? - suspend fun getCorrelatedMessages(group: String, states: List, types: List?, correlationIds: String): List? + suspend fun getCorrelatedMessages(group: String, states: List, types: List?, + correlationIds: String): List? suspend fun updateMessagesState(ids: List, state: String) @@ -51,7 +56,9 @@ interface MessagePrioritizationStateService { suspend fun setMessagesState(ids: List, state: String) - suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List): MessagePrioritization + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) suspend fun deleteMessage(id: String) @@ -64,7 +71,8 @@ interface MessagePrioritizationStateService { @Service open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService { + private val prioritizationMessageRepository: PrioritizationMessageRepository) + : MessagePrioritizationStateService { private val log = logger(MessagePrioritizationStateServiceImpl::class) @@ -82,6 +90,10 @@ open class MessagePrioritizationStateServiceImpl( ?: throw BluePrintProcessorException("couldn't find message for id($id)") } + override suspend fun getMessages(ids: List): List? { + return prioritizationMessageRepository.findAllById(ids) + } + override suspend fun getExpiryEligibleMessages(count: Int): List? { return prioritizationMessageRepository .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), @@ -115,6 +127,7 @@ open class MessagePrioritizationStateServiceImpl( } } + @Transactional override suspend fun updateMessagesState(ids: List, state: String) { ids.forEach { val updated = updateMessageState(it, state) @@ -124,12 +137,17 @@ open class MessagePrioritizationStateServiceImpl( @Transactional override suspend fun setMessageState(id: String, state: String) { - prioritizationMessageRepository.setStatusForMessageId(id, state) + prioritizationMessageRepository.setStateForMessageId(id, state, Date()) } @Transactional override suspend fun setMessagesState(ids: List, state: String) { - prioritizationMessageRepository.setStatusForMessageIds(ids, state) + prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) + } + + @Transactional + override suspend fun setMessageStateANdError(id: String, state: String, error: String) { + prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) } @Transactional @@ -141,16 +159,10 @@ open class MessagePrioritizationStateServiceImpl( return saveMessage(updateMessage) } - override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List) - : MessagePrioritization { - - val groupedIds = groupedMessageIds.joinToString(",") - val updateMessage = getMessage(id).apply { - this.updatedDate = Date() - this.state = state - this.aggregatedMessageIds = groupedIds - } - return saveMessage(updateMessage) + @Transactional + override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { + val groupedIds = aggregatedIds.joinToString(",") + prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) } override suspend fun deleteMessage(id: String) { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt index 8dd4019dd..45f5c773d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -32,14 +32,36 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id, + MessageState.ERROR.name, error) + /** Publish to Error topic */ + this.processorContext.forward(messagePrioritization.id, messagePrioritization, + To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } catch (sendException: Exception) { + log.error("failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e) + } + + } + } } } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 5a5aa2575..7dde2655d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -41,12 +41,22 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) + : ProcessorSupplier { + return ProcessorSupplier { + // Dynamically resolve the Prioritization Processor + val processorInstance = BluePrintDependencyService.instance>(name) + processorInstance.prioritizationConfiguration = prioritizationConfiguration + processorInstance + } + } + +} \ No newline at end of file -- cgit 1.2.3-korg