From 7c3b59560f015b65882a56db585b7d4bdd10d434 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 8 Jun 2018 16:29:31 +0200 Subject: Implement Kafka Sink Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'hv-collector-main/src') diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 4438cf38..b2f4633a 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.main import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import org.slf4j.LoggerFactory import kotlin.system.exitProcess @@ -39,7 +39,7 @@ fun main(args: Array) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.loggingSink() + AdapterFactory.kafkaSink() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { @@ -55,7 +55,7 @@ private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguratio if (serverConfiguration.configurationUrl.isEmpty()) { logger.info("Configuration url not specified - using default config") val sampleConfig = CollectorConfiguration( - kafkaBootstrapServers = "dmaap.cluster.local:9969", + kafkaBootstrapServers = "kafka:9092", routing = routing { defineRoute { fromDomain(Domain.HVRANMEAS) -- cgit 1.2.3-korg