summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/Dockerfile19
-rw-r--r--sources/hv-collector-main/pom.xml143
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt118
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt67
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt55
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt41
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt42
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt52
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml49
-rwxr-xr-xsources/hv-collector-main/src/main/scripts/healthcheck.sh4
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt138
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt191
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt88
13 files changed, 1007 insertions, 0 deletions
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
new file mode 100644
index 00000000..0170b645
--- /dev/null
+++ b/sources/hv-collector-main/Dockerfile
@@ -0,0 +1,19 @@
+FROM docker.io/openjdk:11-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends curl netcat \
+ && apt-get clean
+
+WORKDIR /opt/ves-hv-collector
+
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
+
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY src/main/scripts/healthcheck.sh ./
+COPY target/hv-collector-main-*.jar ./
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
new file mode 100644
index 00000000..a94d6346
--- /dev/null
+++ b/sources/hv-collector-main/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-main</artifactId>
+ <description>VES HighVolume Collector :: Main</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-health-check</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <!-- See comment in main pom
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ -->
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-jmx</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
new file mode 100644
index 00000000..9b985f6f
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Option
+import arrow.core.fix
+import arrow.instances.option.monad.monad
+import arrow.typeclasses.binding
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.hasOption
+import org.onap.dcae.collectors.veshv.utils.commandline.intValue
+import org.onap.dcae.collectors.veshv.utils.commandline.longValue
+import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
+import java.time.Duration
+
+internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList = listOf(
+ HEALTH_CHECK_API_PORT,
+ LISTEN_PORT,
+ CONSUL_CONFIG_URL,
+ CONSUL_FIRST_REQUEST_DELAY,
+ CONSUL_REQUEST_INTERVAL,
+ SSL_DISABLE,
+ KEY_STORE_FILE,
+ KEY_STORE_PASSWORD,
+ TRUST_STORE_FILE,
+ TRUST_STORE_PASSWORD,
+ IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DUMMY_MODE
+ )
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
+ Option.monad().binding {
+ val healthCheckApiPort = cmdLine.intValue(
+ HEALTH_CHECK_API_PORT,
+ DefaultValues.HEALTH_CHECK_API_PORT
+ )
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
+ val dummyMode = cmdLine.hasOption(DUMMY_MODE)
+ val security = createSecurityConfiguration(cmdLine).bind()
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
+ ServerConfiguration(
+ serverListenAddress = InetSocketAddress(listenPort),
+ healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
+ configurationProviderParams = configurationProviderParams,
+ securityConfiguration = security,
+ idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
+ dummyMode = dummyMode)
+ }.fix()
+
+ private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
+ Option.monad().binding {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+ val firstRequestDelay = cmdLine.longValue(
+ CONSUL_FIRST_REQUEST_DELAY,
+ DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ )
+ val requestInterval = cmdLine.longValue(
+ CONSUL_REQUEST_INTERVAL,
+ DefaultValues.CONSUL_REQUEST_INTERVAL
+ )
+ ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }.fix()
+
+ internal object DefaultValues {
+ const val HEALTH_CHECK_API_PORT = 6060
+ const val CONSUL_FIRST_REQUEST_DELAY = 10L
+ const val CONSUL_REQUEST_INTERVAL = 5L
+ const val IDLE_TIMEOUT_SEC = 60L
+ const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
+ }
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
new file mode 100644
index 00000000..8a8b6d39
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Clock
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.jmx.JmxConfig
+import io.micrometer.jmx.JmxMeterRegistry
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class MicrometerMetrics(
+ private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)
+) : Metrics {
+
+ private val receivedBytes = registry.counter("data.received.bytes")
+ private val receivedMsgCount = registry.counter("messages.received.count")
+ private val receivedMsgBytes = registry.counter("messages.received.bytes")
+ private val sentCountTotal = registry.counter("messages.sent.count")
+
+ init {
+ registry.gauge("messages.processing.count", this) {
+ (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ }
+ }
+
+ private val sentCount = { topic: String ->
+ registry.counter("messages.sent.count", "topic", topic)
+ }.memoize<String, Counter>()
+
+
+ override fun notifyBytesReceived(size: Int) {
+ receivedBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ receivedMsgCount.increment()
+ receivedMsgBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ sentCountTotal.increment()
+ sentCount(topic).increment()
+ }
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
new file mode 100644
index 00000000..899f51fb
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
+import org.onap.dcae.collectors.veshv.main.servers.VesServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
+
+fun main(args: Array<String>) =
+ ArgVesHvConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startAndAwaitServers)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.error("Failed to start a server", ex)
+ ExitFailure(1)
+ },
+ { logger.info("Gentle shutdown") }
+ )
+
+private fun startAndAwaitServers(config: ServerConfiguration) =
+ IO.monad().binding {
+ logger.info("Using configuration: $config")
+ HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind()
+ .await().bind()
+ }.fix()
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
new file mode 100644
index 00000000..ae59da69
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object HealthCheckServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+
+ private fun createHealthCheckServer(config: ServerConfiguration) =
+ HealthCheckApiServer(
+ HealthState.INSTANCE,
+ config.healthCheckApiListenAddress)
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
new file mode 100644
index 00000000..5c6f1277
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerStarter {
+ fun start(config: ServerConfiguration): IO<ServerHandle> =
+ startServer(config)
+ .map { logger.info(serverStartedMessage(it)); it }
+
+ protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
+ protected abstract fun serverStartedMessage(handle: ServerHandle): String
+
+ companion object {
+ private val logger = Logger(ServerStarter::class)
+ }
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
new file mode 100644
index 00000000..d788c164
--- /dev/null
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object VesServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
+
+ private fun createVesServer(config: ServerConfiguration): Server {
+ val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+ val collectorProvider = CollectorFactory(
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics(),
+ config.maximumPayloadSizeBytes
+ ).createVesHvCollectorProvider()
+
+ return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ }
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
new file mode 100644
index 00000000..c76ff21a
--- /dev/null
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="COMPONENT_NAME"
+ value="dcae-hv-ves-collector"/>
+ <property name="COMPONENT_SHORT_NAME"
+ value="hv-ves"/>
+
+ <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+ <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+ <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+ <property name="FILE_LOG_PATTERN" value="
+%nopexception%50.50logger
+| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
+| %highlight(%-5level)
+| %msg
+| %rootException
+| %thread%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
+ <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/sources/hv-collector-main/src/main/scripts/healthcheck.sh b/sources/hv-collector-main/src/main/scripts/healthcheck.sh
new file mode 100755
index 00000000..db62eece
--- /dev/null
+++ b/sources/hv-collector-main/src/main/scripts/healthcheck.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+
+curl -f http://localhost:${VESHV_HEALTH_CHECK_API_PORT:-6060}/health/ready || exit 1
+nc -vz localhost ${VESHV_LISTEN_PORT:-6061} || exit 2
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
new file mode 100644
index 00000000..1aac6a09
--- /dev/null
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -0,0 +1,138 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.JdkKeys
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
+import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
+import java.time.Duration
+import kotlin.test.assertNotNull
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ArgVesHvConfigurationTest : Spek({
+ lateinit var cut: ArgVesHvConfiguration
+ val healthCheckApiPort = "6070"
+ val configurationUrl = "http://test-address/test"
+ val firstRequestDelay = "10"
+ val requestInterval = "5"
+ val listenPort = "6969"
+ val keyStorePassword = "kspass"
+ val trustStorePassword = "tspass"
+
+ beforeEachTest {
+ cut = ArgVesHvConfiguration()
+ }
+
+ describe("parsing arguments") {
+ given("all parameters are present in the long form") {
+ lateinit var result: ServerConfiguration
+
+ beforeEachTest {
+ result = cut.parseExpectingSuccess(
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword
+ )
+ }
+
+ it("should set proper listen port") {
+ assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
+ }
+
+
+ it("should set default listen address") {
+ assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
+ }
+
+ it("should set proper health check api port") {
+ assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt())
+ }
+
+ it("should set default health check api address") {
+ assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
+ }
+
+ it("should set proper first consul request delay") {
+ assertThat(result.configurationProviderParams.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
+ }
+
+ it("should set proper consul request interval") {
+ assertThat(result.configurationProviderParams.requestInterval)
+ .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
+ }
+
+ it("should set proper config url") {
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(configurationUrl)
+ }
+
+ it("should set proper security configuration") {
+ assertThat(result.securityConfiguration.sslDisable).isFalse()
+
+ val keys = result.securityConfiguration.keys.orNull() as JdkKeys
+ assertNotNull(keys.keyStore)
+ assertNotNull(keys.trustStore)
+ assertThat(keys.keyStorePassword).isEqualTo(keyStorePassword.toCharArray())
+ assertThat(keys.trustStorePassword).isEqualTo(trustStorePassword.toCharArray())
+ }
+ }
+
+ describe("required parameter is absent") {
+ on("missing listen port") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "--config-url", configurationUrl,
+ "--ssl-disable",
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval)
+ ).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+ on("missing configuration url") {
+ it("should throw exception") {
+ assertThat(cut.parseExpectingFailure(
+ "--listen-port", listenPort,
+ "--ssl-disable",
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval)
+ ).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
new file mode 100644
index 00000000..a379933e
--- /dev/null
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -0,0 +1,191 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Try
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.search.RequiredSearch
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object MicrometerMetricsTest : Spek({
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: SimpleMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = SimpleMeterRegistry()
+ cut = MicrometerMetrics(registry)
+ }
+
+ fun registrySearch() = RequiredSearch.`in`(registry)
+
+ fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
+ Try {
+ map(search)
+ }.fold(
+ { ex -> assertThat(ex).doesNotThrowAnyException() },
+ verifier
+ )
+
+ fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
+ verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+
+ fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
+ verifyMeter(search, RequiredSearch::counter, verifier)
+
+ fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
+ verifyCounter(registrySearch().name(name), verifier)
+
+ fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
+ registry.meters
+ .filter { it is Counter }
+ .filterNot { it.id.name in changedCounters }
+ .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) }
+ }
+
+ describe("notifyBytesReceived") {
+
+ on("data.received.bytes counter") {
+ val counterName = "data.received.bytes"
+
+ it("should increment counter") {
+ val bytes = 128
+ cut.notifyBytesReceived(bytes)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyBytesReceived(128)
+ verifyAllCountersAreUnchangedBut(counterName)
+ }
+ }
+ }
+
+ describe("notifyMessageReceived") {
+ on("messages.received.count counter") {
+ val counterName = "messages.received.count"
+
+ it("should increment counter") {
+ cut.notifyMessageReceived(777)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ on("messages.received.bytes counter") {
+ val counterName = "messages.received.bytes"
+
+ it("should increment counter") {
+ val bytes = 888
+ cut.notifyMessageReceived(bytes)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ }
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyMessageReceived(128)
+ verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes")
+ }
+ }
+
+ describe("notifyMessageSent") {
+ val topicName = "dmaap_topic_name"
+ val counterName = "messages.sent.count"
+
+ on("$counterName counter") {
+
+ it("should increment counter") {
+ cut.notifyMessageSent(topicName)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ on("$counterName[topic=$topicName] counter") {
+
+ it("should increment counter") {
+ cut.notifyMessageSent(topicName)
+
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyMessageSent(topicName)
+ verifyAllCountersAreUnchangedBut(counterName)
+ }
+ }
+
+ describe("processing gauge") {
+ it("should show difference between sent and received messages") {
+
+ on("positive difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageReceived(256)
+ cut.notifyMessageReceived(256)
+ cut.notifyMessageSent("perf3gpp")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ on("zero difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageSent("perf3gpp")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ on("negative difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageSent("fault")
+ cut.notifyMessageSent("perf3gpp")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
+
+})
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt
new file mode 100644
index 00000000..4eef28bb
--- /dev/null
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.xdescribe
+import java.nio.ByteBuffer
+
+object NioBuffersTest : Spek({
+
+ fun Int.toKibibytes(): Int = this * 1024
+ fun Int.toMebibytes(): Int = this * 1024 * 1024
+
+ val BUFFER_SIZES = listOf(128.toKibibytes(), 512.toKibibytes(), 1.toMebibytes(), 2.toMebibytes())
+ val NUMBER_OF_ITERATIONS = 100
+
+ fun measureCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double {
+ bb1.clear()
+ bb2.clear()
+ val start = System.nanoTime()
+ while (bb2.remaining() > 0)
+ bb2.putInt(bb1.getInt())
+ val time = System.nanoTime() - start
+ val operations = bb1.capacity() / Integer.BYTES
+ return time.toDouble() / operations
+ }
+
+ fun measureAverageCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double =
+ (0..NUMBER_OF_ITERATIONS).map { measureCopyTimeInNanos(bb1, bb2) }.average()
+
+ fun measureAndPrintAverageCopyTime(message: String, bb1: ByteBuffer, bb2: ByteBuffer) {
+ val avg = measureAverageCopyTimeInNanos(bb1, bb2)
+ System.out.printf("Each putInt+getInt for %s took an average of %.1f ns%n", message, avg)
+ }
+
+ for (singleBufferSize in BUFFER_SIZES) {
+
+ xdescribe("$singleBufferSize bytes buffers") {
+ describe("direct buffers") {
+
+ val bb1 = ByteBuffer.allocateDirect(singleBufferSize)
+ val bb2 = ByteBuffer.allocateDirect(singleBufferSize)
+
+ it("should be heated up") {
+ measureAverageCopyTimeInNanos(bb1, bb2)
+ }
+
+ it("should work fast") {
+ measureAndPrintAverageCopyTime("direct buffers of $singleBufferSize bytes", bb1, bb2)
+ }
+ }
+
+ describe("on-heap buffers") {
+
+ val bb1 = ByteBuffer.allocate(singleBufferSize)
+ val bb2 = ByteBuffer.allocate(singleBufferSize)
+
+ it("should be heated up") {
+ measureAverageCopyTimeInNanos(bb1, bb2)
+ }
+
+ it("should work fast") {
+ measureAndPrintAverageCopyTime("onheap buffers of $singleBufferSize bytes", bb1, bb2)
+ }
+ }
+ }
+ }
+
+}) \ No newline at end of file