summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt2
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt4
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt26
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt55
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt22
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt4
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt46
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt38
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt20
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt36
11 files changed, 206 insertions, 50 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
index d89f71364..d114da521 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
@@ -34,7 +34,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
this.processorContext = context
/** Get the State service to update in store */
this.messagePrioritizationStateService = BluePrintDependencyService
- .instance(MessagePrioritizationStateService::class)
+ .messagePrioritizationStateService()
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
index ef9d5a058..967cc190e 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
@@ -74,9 +75,12 @@ open class MessagePrioritizationConsumer(
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+ /** To receive completed and error messages */
topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
prioritizationConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
MessagePrioritizationConstants.PROCESSOR_OUTPUT)
// Output will be sent to the group-output topic from Processor API
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index d874cef92..3358a5643 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -28,8 +28,8 @@ enum class MessageState(val id: String) {
EXPIRED("expired"),
PRIORITIZED("prioritized"),
AGGREGATED("aggregated"),
- IGNORED("ignored"),
COMPLETED("completed"),
+ ERROR("error")
}
open class PrioritizationConfiguration : Serializable {
@@ -59,7 +59,6 @@ open class UpdateStateRequest : Serializable {
lateinit var id: String
var group: String? = null
var state: String? = null
- var notifyMessage: String? = null
}
data class CorrelationCheckResponse(var message: String? = null,
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 94fedf4df..ec061ad47 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -16,21 +16,27 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-import org.apache.kafka.streams.processor.ProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
- : ProcessorSupplier<K, V> {
- return ProcessorSupplier<K, V> {
- // Dynamically resolve the Prioritization Processor
- val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
- processorInstance.prioritizationConfiguration = prioritizationConfiguration
- processorInstance
- }
-}
+/**
+ * Register the MessagePrioritizationStateService and exposed dependency
+ */
+fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService =
+ instance(MessagePrioritizationStateService::class)
+/**
+ * Expose messagePrioritizationStateService to AbstractComponentFunction
+ */
+fun AbstractComponentFunction.messagePrioritizationStateService() =
+ BluePrintDependencyService.messagePrioritizationStateService()
+
+/**
+ * MessagePrioritization correlation extensions
+ */
fun MessagePrioritization.toFormatedCorrelation(): String {
val ascendingKey = this.correlationId!!.split(",")
.map { it.trim() }.sorted().joinToString(",")
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
new file mode 100644
index 000000000..382cb9c8a
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
+import org.springframework.http.MediaType
+import org.springframework.web.bind.annotation.*
+
+@RestController
+@RequestMapping(value = ["/api/v1/message-prioritization"])
+open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+
+ @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
+ @ResponseBody
+ fun ping(): String = "Success"
+
+
+ @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
+ @ResponseBody
+ fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc {
+ messagePrioritizationStateService.getMessage(id)
+ }
+
+ @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE])
+ @ResponseBody
+ fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+ messagePrioritizationStateService.saveMessage(messagePrioritization)
+ }
+
+ @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE])
+ fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) =
+ monoMdc {
+ messagePrioritizationStateService.setMessageState(updateMessageState.id,
+ updateMessageState.state!!)
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
index 307d932a9..69c81079d 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
@@ -71,19 +71,27 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Modifying
@Transactional
- @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id")
- fun setStatusForMessageId(id: String, state: String): Int
+ @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id = :id")
+ fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
@Modifying
@Transactional
- @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids")
- fun setStatusForMessageIds(ids: List<String>, state: String): Int
+ @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id IN :ids")
+ fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
@Modifying
@Transactional
- @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " +
- "WHERE pm.id = :id")
- fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int
+ @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
+ "WHERE id = :id")
+ fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query("UPDATE MessagePrioritization SET state = :state, " +
+ "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id")
+ fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
@Modifying
@Transactional
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
index 4973cdf6e..1825f91c2 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
@@ -49,6 +49,10 @@ open class MessagePrioritization {
var message: String? = null
@Lob
+ @Column(name = "error", nullable = true)
+ var error: String? = null
+
+ @Lob
@Column(name = "aggregated_message_ids", nullable = true)
var aggregatedMessageIds: String? = null
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
index e4369fc20..8424226c2 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
@@ -33,15 +33,20 @@ interface MessagePrioritizationStateService {
suspend fun getMessage(id: String): MessagePrioritization
+ suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
+ : List<MessagePrioritization>?
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
+ : List<MessagePrioritization>?
suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
- suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
+ suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
+ correlationIds: String): List<MessagePrioritization>?
suspend fun updateMessagesState(ids: List<String>, state: String)
@@ -51,7 +56,9 @@ interface MessagePrioritizationStateService {
suspend fun setMessagesState(ids: List<String>, state: String)
- suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
+ suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+ suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
suspend fun deleteMessage(id: String)
@@ -64,7 +71,8 @@ interface MessagePrioritizationStateService {
@Service
open class MessagePrioritizationStateServiceImpl(
- private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
+ private val prioritizationMessageRepository: PrioritizationMessageRepository)
+ : MessagePrioritizationStateService {
private val log = logger(MessagePrioritizationStateServiceImpl::class)
@@ -82,6 +90,10 @@ open class MessagePrioritizationStateServiceImpl(
?: throw BluePrintProcessorException("couldn't find message for id($id)")
}
+ override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findAllById(ids)
+ }
+
override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
return prioritizationMessageRepository
.findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
@@ -115,6 +127,7 @@ open class MessagePrioritizationStateServiceImpl(
}
}
+ @Transactional
override suspend fun updateMessagesState(ids: List<String>, state: String) {
ids.forEach {
val updated = updateMessageState(it, state)
@@ -124,12 +137,17 @@ open class MessagePrioritizationStateServiceImpl(
@Transactional
override suspend fun setMessageState(id: String, state: String) {
- prioritizationMessageRepository.setStatusForMessageId(id, state)
+ prioritizationMessageRepository.setStateForMessageId(id, state, Date())
}
@Transactional
override suspend fun setMessagesState(ids: List<String>, state: String) {
- prioritizationMessageRepository.setStatusForMessageIds(ids, state)
+ prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
+ }
+
+ @Transactional
+ override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
+ prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
}
@Transactional
@@ -141,16 +159,10 @@ open class MessagePrioritizationStateServiceImpl(
return saveMessage(updateMessage)
}
- override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
- : MessagePrioritization {
-
- val groupedIds = groupedMessageIds.joinToString(",")
- val updateMessage = getMessage(id).apply {
- this.updatedDate = Date()
- this.state = state
- this.aggregatedMessageIds = groupedIds
- }
- return saveMessage(updateMessage)
+ @Transactional
+ override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
+ val groupedIds = aggregatedIds.joinToString(",")
+ prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
}
override suspend fun deleteMessage(id: String) {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
index 8dd4019dd..45f5c773d 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
@@ -32,14 +32,36 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String
log.info("@@@@@ received in aggregation processor key($key), value($value)")
val ids = value.split(",").map { it.trim() }
if (!ids.isNullOrEmpty()) {
- if (ids.size == 1) {
- processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ try {
+ if (ids.size == 1) {
+ processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ } catch (e: Exception) {
+ val error = "failed in Aggregate message($ids) : ${e.message}"
+ log.error(error, e)
+ val storeMessages = messagePrioritizationStateService.getMessages(ids)
+ if (!storeMessages.isNullOrEmpty()) {
+ storeMessages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id,
+ MessageState.ERROR.name, error)
+ /** Publish to Error topic */
+ this.processorContext.forward(messagePrioritization.id, messagePrioritization,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ } catch (sendException: Exception) {
+ log.error("failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e)
+ }
+
+ }
+ }
}
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
index 5a5aa2575..7dde2655d 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -41,12 +41,22 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
override suspend fun processNB(key: ByteArray, value: ByteArray) {
log.info("***** received in prioritize processor key(${String(key)})")
- val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
- // Save the Message
- messagePrioritizationStateService.saveMessage(data)
- handleCorrelationAndNextStep(data)
-
+ try {
+ // Save the Message
+ messagePrioritizationStateService.saveMessage(messagePrioritize)
+ handleCorrelationAndNextStep(messagePrioritize)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!)
+ /** Publish to Output topic */
+ this.processorContext.forward(messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
}
override fun init(context: ProcessorContext) {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
new file mode 100644
index 000000000..02614d821
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+object MessageProcessorUtils {
+
+ fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
+ : ProcessorSupplier<K, V> {
+ return ProcessorSupplier<K, V> {
+ // Dynamically resolve the Prioritization Processor
+ val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+ processorInstance.prioritizationConfiguration = prioritizationConfiguration
+ processorInstance
+ }
+ }
+
+} \ No newline at end of file