aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/Dockerfile2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt5
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt6
5 files changed, 14 insertions, 4 deletions
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index ad7a03d6..3322059c 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -11,7 +11,7 @@ RUN apt-get update \
WORKDIR /opt/ves-hv-collector
-ENTRYPOINT ["entry.sh"]
+ENTRYPOINT ["./entry.sh"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 9b985f6f..ae87f1c2 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
@@ -52,6 +54,7 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
+ KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
CONSUL_CONFIG_URL,
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
HEALTH_CHECK_API_PORT,
DefaultValues.HEALTH_CHECK_API_PORT
)
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers),
healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 288145aa..f3bcf381 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor(
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0)
}
registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index f9be546a..4e2e6d86 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -36,10 +36,9 @@ object VesServer : ServerStarter() {
override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
private fun createVesServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
+ AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 1aac6a09..9dddeca9 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -39,6 +39,7 @@ import kotlin.test.assertNotNull
*/
object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
+ val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
@@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
"--config-url", configurationUrl,
@@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({
)
}
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
it("should set proper listen port") {
assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
}