summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt122
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt62
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt82
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt37
8 files changed, 496 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
new file mode 100644
index 00000000..8c16736d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import reactor.netty.http.client.HttpClient
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+ fun loggingSink(): SinkProvider = LoggingSinkProvider()
+
+ fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
+
+ private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
new file mode 100644
index 00000000..ec7c60c0
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.io.StringReader
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+import javax.json.Json
+import javax.json.JsonObject
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConsulConfigurationProvider(private val http: HttpAdapter,
+ private val url: String,
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration,
+ private val healthState: HealthState,
+ retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+
+ private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
+ private val retry = retrySpec
+ .doOnRetry {
+ logger.warn("Could not get fresh configuration", it.exception())
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ }
+
+ constructor(http: HttpAdapter,
+ params: ConfigurationProviderParams) : this(
+ http,
+ params.configurationUrl,
+ params.firstRequestDelay,
+ params.requestInterval,
+ HealthState.INSTANCE,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
+ .jitter(Jitter.random())
+ )
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ Flux.interval(firstRequestDelay, requestInterval)
+ .concatMap { askForConfig() }
+ .flatMap(::filterDifferentValues)
+ .map(::parseJsonResponse)
+ .map(::createCollectorConfiguration)
+ .retryWhen(retry)
+
+ private fun askForConfig(): Mono<String> = http.get(url)
+
+ private fun filterDifferentValues(configurationString: String) =
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ Mono.empty()
+ } else {
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
+ }
+
+ private fun hashOf(str: String) = str.hashCode()
+
+ private fun parseJsonResponse(responseString: String): JsonObject =
+ Json.createReader(StringReader(responseString)).readObject()
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+ logger.info { "Obtained new configuration from consul:\n${configuration}" }
+ val routing = configuration.getJsonArray("collector.routing")
+
+ return CollectorConfiguration(
+ kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
+ routing = org.onap.dcae.collectors.veshv.model.routing {
+ for (route in routing) {
+ val routeObj = route.asJsonObject()
+ defineRoute {
+ fromDomain(routeObj.getString("fromDomain"))
+ toTopic(routeObj.getString("toTopic"))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ }
+
+ companion object {
+ private const val MAX_RETRIES = 5L
+ private const val BACKOFF_INTERVAL_FACTOR = 30L
+ private val logger = Logger(ConsulConfigurationProvider::class)
+ }
+}
+
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
new file mode 100644
index 00000000..bdce6f73
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import io.netty.handler.codec.http.HttpStatusClass
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Mono
+import reactor.netty.http.client.HttpClient
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+open class HttpAdapter(private val httpClient: HttpClient) {
+
+ private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
+
+ open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
+ logger.debug("Nested exception:", it)
+ }
+
+ private fun createQueryString(params: Map<String, Any>): String {
+ if (params.isEmpty())
+ return ""
+
+ val builder = StringBuilder("?")
+ params.forEach { (key, value) ->
+ builder
+ .append(key)
+ .append("=")
+ .append(value)
+ .append("&")
+
+ }
+
+ return builder.removeSuffix("&").toString()
+ }
+
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
new file mode 100644
index 00000000..5f4bf354
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class LoggingSinkProvider : SinkProvider {
+
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return object : Sink {
+ private val totalMessages = AtomicLong()
+ private val totalBytes = AtomicLong()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
+ messages
+ .doOnNext(this::logMessage)
+
+ private fun logMessage(msg: RoutedMessage) {
+ val msgs = totalMessages.addAndGet(1)
+ val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+ val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
+ if (msgs % INFO_LOGGING_FREQ == 0L)
+ logger.info(logMessageSupplier)
+ else
+ logger.trace(logMessageSupplier)
+ }
+
+ }
+ }
+
+ companion object {
+ const val INFO_LOGGING_FREQ = 100_000
+ private val logger = Logger(LoggingSinkProvider::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
new file mode 100644
index 00000000..a0c22418
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderRecord
+import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+ private val sentMessages = AtomicLong(0)
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ val records = messages.map(this::vesToKafkaRecord)
+ val result = sender.send(records)
+ .doOnNext(::logException)
+ .filter(::isSuccessful)
+ .map { it.correlationMetadata() }
+
+ return if (logger.traceEnabled) {
+ result.doOnNext(::logSentMessage)
+ } else {
+ result
+ }
+ }
+
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
+ return SenderRecord.create(
+ msg.topic,
+ msg.partition,
+ System.currentTimeMillis(),
+ msg.message.header,
+ msg.message,
+ msg)
+ }
+
+ private fun logException(senderResult: SenderResult<out Any>) {
+ if (senderResult.exception() != null) {
+ logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
+ }
+ }
+
+ private fun logSentMessage(sentMsg: RoutedMessage) {
+ logger.trace {
+ val msgNum = sentMessages.incrementAndGet()
+ "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+
+ companion object {
+ val logger = Logger(KafkaSink::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
new file mode 100644
index 00000000..18191952
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ }
+
+ private fun constructSenderOptions(config: CollectorConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
new file mode 100644
index 00000000..4e9932cc
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import org.apache.kafka.common.serialization.Serializer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ProtobufSerializer : Serializer<MessageLite> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // no configuration
+ }
+
+ override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
+ data?.toByteArray()
+
+ override fun close() {
+ // cleanup not needed
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
new file mode 100644
index 00000000..7a6ac7c8
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.dcae.collectors.veshv.model.VesMessage
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class VesMessageSerializer : Serializer<VesMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+
+ override fun close() {
+ }
+}