From a0407eb91a2424f847e188796328871b3a339c93 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Fri, 8 Nov 2019 16:41:07 -0500 Subject: Add Kafka Streams consumer service Issue-ID: CCSDK-1914 Signed-off-by: Brinda Santh Change-Id: I8d2b51c66e1304decadbb55656fe8a0b4c018feb --- .../modules/commons/message-lib/pom.xml | 13 +++ .../message/BluePrintMessageLibConfiguration.kt | 2 + .../message/BluePrintMessageLibData.kt | 17 +++ .../service/BluePrintMessageLibPropertyService.kt | 15 +++ .../service/BlueprintMessageConsumerService.kt | 6 + .../KafkaBasicAuthMessageProducerService.kt | 6 +- .../KafkaStreamsBasicAuthConsumerService.kt | 70 ++++++++++++ .../KafkaStreamsBasicAuthConsumerServiceTest.kt | 126 +++++++++++++++++++++ .../message/service/MockKafkaTopologyComponents.kt | 103 +++++++++++++++++ 9 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml index f92a8f45a..8d08ae838 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml @@ -41,6 +41,19 @@ org.springframework.kafka spring-kafka + + org.apache.kafka + kafka-streams + + + org.apache.kafka + kafka-clients + + + org.apache.kafka + kafka-streams-test-utils + test + org.springframework.kafka spring-kafka-test diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 27a444bdc..ecffa280f 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,5 +63,6 @@ class MessageLibConstants { const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" } } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 184e85b70..d0c3d5ae1 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -17,6 +17,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import org.apache.kafka.streams.StreamsConfig + /** Producer Properties **/ open class MessageProducerProperties @@ -25,12 +27,27 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() lateinit var bootstrapServers: String var topic: String? = null var clientId: String? = null + // strongest producing guarantee + var acks: String = "all" + var retries: Int = 0 + // ensure we don't push duplicates + var enableIdempotence: Boolean = true } /** Consumer Properties **/ open class MessageConsumerProperties +open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { + lateinit var bootstrapServers: String + lateinit var applicationId: String + lateinit var topic: String + var autoOffsetReset: String = "latest" + var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE +} + +open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties() + open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 7c56ea432..563da9435 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +102,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { kafkaBasicAuthMessageConsumerProperties(prefix) } + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + kafkaStreamsBasicAuthMessageConsumerProperties(prefix) + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -113,6 +117,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -126,6 +133,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B is KafkaBasicAuthMessageConsumerProperties -> { return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) } + is KafkaStreamsBasicAuthConsumerProperties -> { + return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties) + } else -> { throw BluePrintProcessorException("couldn't get Message client service for") } @@ -137,4 +147,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B prefix, KafkaBasicAuthMessageConsumerProperties::class.java) } + private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { + return bluePrintProperties.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java) + } + } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 8bcc7580a..716fda609 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -20,6 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException @@ -61,4 +62,9 @@ interface BlueprintMessageConsumerService { interface KafkaConsumerRecordsFunction : ConsumerFunction { suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) +} + +interface KafkaStreamConsumerFunction : ConsumerFunction { + suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map?): Topology } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 42adcd712..ad9a594b0 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -65,9 +65,9 @@ class KafkaBasicAuthMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - log.info("message published offset(${metadata.offset()}, headers :$headers )") + log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") } - messageTemplate().send(record, callback).get() + messageTemplate().send(record, callback) return true } @@ -77,6 +77,8 @@ class KafkaBasicAuthMessageProducerService( configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ACKS_CONFIG] = messageProducerProperties.acks + configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt new file mode 100644 index 000000000..229e462da --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -0,0 +1,70 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.* + +open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) + : BlueprintMessageConsumerService { + + val log = logger(KafkaStreamsBasicAuthConsumerService::class) + lateinit var kafkaStreams: KafkaStreams + + private fun streamsConfig(additionalConfig: Map? = null): Properties { + val configProperties = Properties() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId + configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee + // TODO("Security Implementation based on type") + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return configProperties + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + val streamsConfig = streamsConfig(additionalConfig) + val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + kafkaStreams = KafkaStreams(topology, streamsConfig) + kafkaStreams.cleanUp() + kafkaStreams.start() + kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } + } + + override suspend fun shutDown() { + if (kafkaStreams != null) { + kafkaStreams.close() + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt new file mode 100644 index 000000000..e2a31f40a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt @@ -0,0 +1,126 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.apache.kafka.streams.state.Stores +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull + + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class]) +@TestPropertySource(properties = +[ + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" + +]) +class KafkaStreamsBasicAuthConsumerServiceTest { + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testProperties() { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testKafkaStreamingMessageConsumer() { + runBlocking { + val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + + // Dynamic Consumer Function to create Topology + val consumerFunction = object : KafkaStreamConsumerFunction { + override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map?): Topology { + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + topology.addSource("Source", *topics.toTypedArray()) + // Processor Supplier + val firstProcessorSupplier = object : ProcessorSupplier { + override fun get(): Processor { + return FirstProcessor() + } + } + val changelogConfig: MutableMap = hashMapOf() + changelogConfig.put("min.insync.replicas", "1") + + // Store Buolder + val countStoreSupplier = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde()) + .withLoggingEnabled(changelogConfig) + + topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") + topology.addStateStore(countStoreSupplier, "FirstProcessor") + topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor") + return topology + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + launch { + repeat(5) { + delay(1000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) + } + } + streamingConsumerService.consume(null, consumerFunction) + delay(10000) + streamingConsumerService.shutDown() + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt new file mode 100644 index 000000000..4db9c772e --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt @@ -0,0 +1,103 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.state.KeyValueStore +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.io.Serializable +import java.nio.charset.Charset +import java.util.* + +class PriorityMessage : Serializable { + lateinit var id: String + lateinit var requestMessage: String +} + +open class PriorityMessageSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): PriorityMessage { + return JacksonUtils.readValue(String(data), PriorityMessage::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: PriorityMessage): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} + + +class FirstProcessor : Processor { + + private val log = logger(FirstProcessor::class) + + private lateinit var context: ProcessorContext + private lateinit var kvStore: KeyValueStore + + override fun process(key: ByteArray, value: ByteArray) { + log.info("First Processor key(${String(key)} : value(${String(value)})") + val newMessage = PriorityMessage().apply { + id = UUID.randomUUID().toString() + requestMessage = String(value) + } + kvStore.put(newMessage.id, newMessage) + this.context.forward(newMessage.id, newMessage) + } + + override fun init(context: ProcessorContext) { + log.info("init... ${context.keySerde()}, ${context.valueSerde()}") + this.context = context + this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore + } + + override fun close() { + log.info("Close...") + } +} \ No newline at end of file -- cgit 1.2.3-korg