diff options
11 files changed, 206 insertions, 50 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt index d89f71364..d114da521 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt @@ -34,7 +34,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa this.processorContext = context /** Get the State service to update in store */ this.messagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) + .messagePrioritizationStateService() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ef9d5a058..967cc190e 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService @@ -74,9 +75,12 @@ open class MessagePrioritizationConsumer( Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + /** To receive completed and error messages */ topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, prioritizationConfiguration.outputTopic, Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, MessagePrioritizationConstants.PROCESSOR_OUTPUT) // Output will be sent to the group-output topic from Processor API diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index d874cef92..3358a5643 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -28,8 +28,8 @@ enum class MessageState(val id: String) { EXPIRED("expired"), PRIORITIZED("prioritized"), AGGREGATED("aggregated"), - IGNORED("ignored"), COMPLETED("completed"), + ERROR("error") } open class PrioritizationConfiguration : Serializable { @@ -59,7 +59,6 @@ open class UpdateStateRequest : Serializable { lateinit var id: String var group: String? = null var state: String? = null - var notifyMessage: String? = null } data class CorrelationCheckResponse(var message: String? = null, diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 94fedf4df..ec061ad47 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -16,21 +16,27 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) - : ProcessorSupplier<K, V> { - return ProcessorSupplier<K, V> { - // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance - } -} +/** + * Register the MessagePrioritizationStateService and exposed dependency + */ +fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = + instance(MessagePrioritizationStateService::class) +/** + * Expose messagePrioritizationStateService to AbstractComponentFunction + */ +fun AbstractComponentFunction.messagePrioritizationStateService() = + BluePrintDependencyService.messagePrioritizationStateService() + +/** + * MessagePrioritization correlation extensions + */ fun MessagePrioritization.toFormatedCorrelation(): String { val ascendingKey = this.correlationId!!.split(",") .map { it.trim() }.sorted().joinToString(",") diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt new file mode 100644 index 000000000..382cb9c8a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -0,0 +1,55 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc +import org.springframework.http.MediaType +import org.springframework.web.bind.annotation.* + +@RestController +@RequestMapping(value = ["/api/v1/message-prioritization"]) +open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) { + + @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun ping(): String = "Success" + + + @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc { + messagePrioritizationStateService.getMessage(id) + } + + @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { + messagePrioritizationStateService.saveMessage(messagePrioritization) + } + + @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE]) + fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = + monoMdc { + messagePrioritizationStateService.setMessageState(updateMessageState.id, + updateMessageState.state!!) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt index 307d932a9..69c81079d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt @@ -71,19 +71,27 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Modifying @Transactional - @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id") - fun setStatusForMessageId(id: String, state: String): Int + @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id = :id") + fun setStateForMessageId(id: String, state: String, currentDate: Date): Int @Modifying @Transactional - @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids") - fun setStatusForMessageIds(ids: List<String>, state: String): Int + @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids") + fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int @Modifying @Transactional - @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " + - "WHERE pm.id = :id") - fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int + @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id") + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id") + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int @Modifying @Transactional diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt index 4973cdf6e..1825f91c2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt @@ -49,6 +49,10 @@ open class MessagePrioritization { var message: String? = null @Lob + @Column(name = "error", nullable = true) + var error: String? = null + + @Lob @Column(name = "aggregated_message_ids", nullable = true) var aggregatedMessageIds: String? = null diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt index e4369fc20..8424226c2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -33,15 +33,20 @@ interface MessagePrioritizationStateService { suspend fun getMessage(id: String): MessagePrioritization + suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? + suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>? + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int) + : List<MessagePrioritization>? - suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>? + suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int) + : List<MessagePrioritization>? suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? - suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>? + suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, + correlationIds: String): List<MessagePrioritization>? suspend fun updateMessagesState(ids: List<String>, state: String) @@ -51,7 +56,9 @@ interface MessagePrioritizationStateService { suspend fun setMessagesState(ids: List<String>, state: String) - suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) suspend fun deleteMessage(id: String) @@ -64,7 +71,8 @@ interface MessagePrioritizationStateService { @Service open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService { + private val prioritizationMessageRepository: PrioritizationMessageRepository) + : MessagePrioritizationStateService { private val log = logger(MessagePrioritizationStateServiceImpl::class) @@ -82,6 +90,10 @@ open class MessagePrioritizationStateServiceImpl( ?: throw BluePrintProcessorException("couldn't find message for id($id)") } + override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? { + return prioritizationMessageRepository.findAllById(ids) + } + override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? { return prioritizationMessageRepository .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), @@ -115,6 +127,7 @@ open class MessagePrioritizationStateServiceImpl( } } + @Transactional override suspend fun updateMessagesState(ids: List<String>, state: String) { ids.forEach { val updated = updateMessageState(it, state) @@ -124,12 +137,17 @@ open class MessagePrioritizationStateServiceImpl( @Transactional override suspend fun setMessageState(id: String, state: String) { - prioritizationMessageRepository.setStatusForMessageId(id, state) + prioritizationMessageRepository.setStateForMessageId(id, state, Date()) } @Transactional override suspend fun setMessagesState(ids: List<String>, state: String) { - prioritizationMessageRepository.setStatusForMessageIds(ids, state) + prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) + } + + @Transactional + override suspend fun setMessageStateANdError(id: String, state: String, error: String) { + prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) } @Transactional @@ -141,16 +159,10 @@ open class MessagePrioritizationStateServiceImpl( return saveMessage(updateMessage) } - override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>) - : MessagePrioritization { - - val groupedIds = groupedMessageIds.joinToString(",") - val updateMessage = getMessage(id).apply { - this.updatedDate = Date() - this.state = state - this.aggregatedMessageIds = groupedIds - } - return saveMessage(updateMessage) + @Transactional + override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) { + val groupedIds = aggregatedIds.joinToString(",") + prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) } override suspend fun deleteMessage(id: String) { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt index 8dd4019dd..45f5c773d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -32,14 +32,36 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String log.info("@@@@@ received in aggregation processor key($key), value($value)") val ids = value.split(",").map { it.trim() } if (!ids.isNullOrEmpty()) { - if (ids.size == 1) { - processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) - } else { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(ids) - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + try { + if (ids.size == 1) { + processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) + } else { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(ids) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + } + } catch (e: Exception) { + val error = "failed in Aggregate message($ids) : ${e.message}" + log.error(error, e) + val storeMessages = messagePrioritizationStateService.getMessages(ids) + if (!storeMessages.isNullOrEmpty()) { + storeMessages.forEach { messagePrioritization -> + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id, + MessageState.ERROR.name, error) + /** Publish to Error topic */ + this.processorContext.forward(messagePrioritization.id, messagePrioritization, + To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } catch (sendException: Exception) { + log.error("failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e) + } + + } + } } } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 5a5aa2575..7dde2655d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -41,12 +41,22 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA override suspend fun processNB(key: ByteArray, value: ByteArray) { log.info("***** received in prioritize processor key(${String(key)})") - val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") - // Save the Message - messagePrioritizationStateService.saveMessage(data) - handleCorrelationAndNextStep(data) - + try { + // Save the Message + messagePrioritizationStateService.saveMessage(messagePrioritize) + handleCorrelationAndNextStep(messagePrioritize) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!!) + /** Publish to Output topic */ + this.processorContext.forward(messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } } override fun init(context: ProcessorContext) { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt new file mode 100644 index 000000000..02614d821 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -0,0 +1,36 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +object MessageProcessorUtils { + + fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) + : ProcessorSupplier<K, V> { + return ProcessorSupplier<K, V> { + // Dynamically resolve the Prioritization Processor + val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) + processorInstance.prioritizationConfiguration = prioritizationConfiguration + processorInstance + } + } + +}
\ No newline at end of file |