aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt3
13 files changed, 168 insertions, 76 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index ac55e55f..e4a73947 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
interface Metrics {
@@ -41,14 +42,14 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+ operator fun invoke(ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+ override fun invoke(
+ ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2008fc35..fe2b89d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
+
return { ctx: ClientContext ->
- config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
- clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
- protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
- sink = sinkProvider(config, ctx),
- metrics = metrics)
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 5c3f339c..fd01c9d8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
@@ -96,10 +98,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext(metrics::notifyMessageSent)
+ .doOnNext(this::updateSinkMetrics)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
@@ -108,6 +110,15 @@ internal class VesHvCollector(
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
+ private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
+ when (consumedMessage) {
+ is SuccessfullyConsumedMessage ->
+ metrics.notifyMessageSent(consumedMessage.message)
+ is FailedToConsumeMessage ->
+ metrics.notifyMessageDropped(consumedMessage.cause)
+ }
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8c16736d..75b6f0a6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import reactor.netty.http.client.HttpClient
/**
@@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient
* @since May 2018
*/
object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
+ fun sinkCreatorFactory(dummyMode: Boolean,
+ kafkaConfig: KafkaConfiguration): SinkProvider =
+ if (dummyMode)
+ LoggingSinkProvider()
+ else
+ KafkaSinkProvider(kafkaConfig)
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index e4453c90..717da092 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
@@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- val routing = configuration.getJsonArray("collector.routing")
+ val routingArray = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
+ routing {
+ for (route in routingArray) {
val routeObj = route.asJsonObject()
defineRoute {
fromDomain(routeObj.getString("fromDomain"))
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 14966d9b..7535fbee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ override fun invoke(ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b4f9a90c..73c852d6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,19 +20,20 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
private val ctx: ClientContext) : Sink {
- private val sentMessages = AtomicLong(0)
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext {
- if (it.isSuccessful()) {
- Mono.just(it)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.map(::vesToKafkaRecord).let { records ->
+ sender.send(records).map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
} else {
- logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
- Mono.empty<SenderResult<RoutedMessage>>()
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
}
- .map { it.correlationMetadata() }
-
- return result.doOnNext(::logSentMessage)
- }
+ }
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
+ private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
+ SenderRecord.create(
+ routed.topic,
+ routed.partition,
+ FILL_TIMESTAMP_LATER,
+ routed.message.header,
+ routed.message,
+ routed)
- private fun SenderResult<out Any>.isSuccessful() = exception() == null
+ internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
companion object {
- val logger = Logger(KafkaSink::class)
+ private val FILL_TIMESTAMP_LATER: Long? = null
+ private val logger = Logger(KafkaSink::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index b4f470d4..2fa4f545 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,11 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
- }
+internal class KafkaSinkProvider internal constructor(
+ private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
+
+ constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+
+ override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ companion object {
+ private fun constructKafkaSender(config: KafkaConfiguration) =
+ KafkaSender.create(constructSenderOptions(config))
+
+ private fun constructSenderOptions(config: KafkaConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(RETRIES_CONFIG, 1)
+ .producerProperty(ACKS_CONFIG, "1")
+ .stopOnError(false)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
index ec546c7d..f65b97ca 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
+data class CollectorConfiguration(val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
new file mode 100644
index 00000000..f65e157d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class KafkaConfiguration(val bootstrapServers: String)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 85117684..7e5044f9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val serverListenAddress: InetSocketAddress,
+ val kafkaConfiguration: KafkaConfiguration,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
new file mode 100644
index 00000000..29c418a4
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.lang.Exception
+
+sealed class ConsumedMessage {
+ abstract val message: RoutedMessage
+}
+
+data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage()
+
+data class FailedToConsumeMessage(
+ override val message: RoutedMessage,
+ val exception: Exception?,
+ val cause: MessageDropCause) : ConsumedMessage()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index 836eab53..ab7b196a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
*/
enum class MessageDropCause(val tag: String) {
ROUTE_NOT_FOUND("routing"),
- INVALID_MESSAGE("invalid")
+ INVALID_MESSAGE("invalid"),
+ KAFKA_FAILURE("kafka")
}
enum class ClientRejectionCause(val tag: String) {