aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-main')
-rw-r--r--hv-collector-main/pom.xml99
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt82
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt72
-rw-r--r--hv-collector-main/src/main/resources/logback.xml35
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt72
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt87
6 files changed, 447 insertions, 0 deletions
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
new file mode 100644
index 00000000..9f02ed58
--- /dev/null
+++ b/hv-collector-main/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+ <artifactId>ves-hv-collector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-main</artifactId>
+ <description>VES HighVolume Collector :: Main</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
new file mode 100644
index 00000000..6311b6c0
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.apache.commons.cli.*
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+
+internal object DefaultValues {
+ const val PORT = 8600
+ const val CONFIG_URL = ""
+}
+
+internal object ArgBasedServerConfiguration {
+ private val OPT_PORT = Option.builder("p")
+ .longOpt("listen-port")
+ .hasArg()
+ .desc("Listen port")
+ .build()
+
+ private val OPT_CONFIG_URL = Option.builder("c")
+ .longOpt("config-url")
+ .optionalArg(true)
+ .hasArg()
+ .desc("Url of ves configuration on consul")
+ .build()
+
+ private val options by lazy {
+ val options = Options()
+ options.addOption(OPT_PORT)
+ options.addOption(OPT_CONFIG_URL)
+ options
+ }
+
+ fun parse(args: Array<out String>): ServerConfiguration {
+ val parser = DefaultParser()
+
+ try {
+ parser.parse(options, args).run {
+ return ServerConfiguration(
+ intValue(OPT_PORT, DefaultValues.PORT),
+ stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL))
+ }
+ } catch (ex: Exception) {
+ throw WrongArgumentException(ex)
+ }
+ }
+
+ private fun CommandLine.intValue(option: Option, default: Int) =
+ getOptionValue(option.opt)?.toInt() ?: default
+
+ private fun CommandLine.stringValue(option: Option, default: String) =
+ getOptionValue(option.opt) ?: default
+
+
+ class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) {
+ fun printMessage() {
+ println(message)
+ }
+
+ fun printHelp(programName: String) {
+ val formatter = HelpFormatter()
+ formatter.printHelp(programName, options)
+ }
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
new file mode 100644
index 00000000..d81a063d
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import org.onap.dcae.collectors.veshv.domain.routing
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.slf4j.LoggerFactory
+import kotlin.system.exitProcess
+
+private val logger = LoggerFactory.getLogger("main")
+
+fun main(args: Array<String>) {
+ try {
+ val serverConfiguration = ArgBasedServerConfiguration.parse(args)
+
+ val collectorProvider = CollectorFactory(
+ resolveConfigurationProvider(serverConfiguration),
+ AdapterFactory.kafkaSink()
+ ).createVesHvCollectorProvider()
+ ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
+ } catch (ex: WrongArgumentException) {
+ ex.printMessage()
+ ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
+ exitProcess(1)
+ }
+}
+
+
+private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider {
+
+ if (serverConfiguration.configurationUrl.isEmpty()) {
+ logger.info("Configuration url not specified - using default config")
+ val sampleConfig = CollectorConfiguration(
+ kafkaBootstrapServers = "dmaap.cluster.local:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic("ves_hvRanMeas")
+ withFixedPartitioning()
+ }
+ }.build()
+ )
+ return AdapterFactory.staticConfigurationProvider(sampleConfig)
+ }
+
+ logger.info("Using configuration url: ${serverConfiguration.configurationUrl}")
+ return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl)
+}
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
new file mode 100644
index 00000000..809f62d4
--- /dev/null
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
new file mode 100644
index 00000000..0d2188ca
--- /dev/null
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ArgBasedServerConfigurationTest : Spek({
+ val cut = ArgBasedServerConfiguration
+ val configurationUrl = "http://test-address/test"
+
+ fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+
+ given("all parameters are present in the long form") {
+ val result = parse("--listen-port", "6969", "--config-url", configurationUrl)
+
+ it("should set proper port") {
+ assertThat(result.port).isEqualTo(6969)
+ }
+
+ it("should set proper config url") {
+ assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ }
+ }
+
+ given("all parameters are present in the short form") {
+ val result = parse("-p", "666", "-c", configurationUrl)
+
+ it("should set proper port") {
+ assertThat(result.port).isEqualTo(666)
+ }
+
+ it("should set proper config url") {
+ assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ }
+ }
+
+ given("all optional parameters are absent") {
+ val result = parse()
+
+ it("should set default port") {
+ assertThat(result.port).isEqualTo(DefaultValues.PORT)
+ }
+
+ it("should set default config url") {
+ assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt
new file mode 100644
index 00000000..b46d5a28
--- /dev/null
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import java.nio.ByteBuffer
+
+fun Int.toKibibytes(): Int = this * 1024
+fun Int.toMebibytes(): Int = this * 1024 * 1024
+
+
+object NioBuffersTest : Spek({
+ val BUFFER_SIZES = listOf(128.toKibibytes(), 512.toKibibytes(), 1.toMebibytes(), 2.toMebibytes())
+ val NUMBER_OF_ITERATIONS = 100
+
+ fun measureCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double {
+ bb1.clear()
+ bb2.clear()
+ val start = System.nanoTime()
+ while (bb2.remaining() > 0)
+ bb2.putInt(bb1.getInt())
+ val time = System.nanoTime() - start
+ val operations = bb1.capacity() / Integer.BYTES
+ return time.toDouble() / operations
+ }
+
+ fun measureAverageCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double =
+ (0..NUMBER_OF_ITERATIONS).map { measureCopyTimeInNanos(bb1, bb2) }.average()
+
+ fun measureAndPrintAverageCopyTime(message: String, bb1: ByteBuffer, bb2: ByteBuffer) {
+ val avg = measureAverageCopyTimeInNanos(bb1, bb2)
+ System.out.printf("Each putInt+getInt for %s took an average of %.1f ns%n", message, avg)
+ }
+
+ for (singleBufferSize in BUFFER_SIZES) {
+
+ describe("$singleBufferSize bytes buffers") {
+ describe("direct buffers") {
+
+ val bb1 = ByteBuffer.allocateDirect(singleBufferSize)
+ val bb2 = ByteBuffer.allocateDirect(singleBufferSize)
+
+ it("should be heated up") {
+ measureAverageCopyTimeInNanos(bb1, bb2)
+ }
+
+ it("should work fast") {
+ measureAndPrintAverageCopyTime("direct buffers of $singleBufferSize bytes", bb1, bb2)
+ }
+ }
+
+ describe("on-heap buffers") {
+
+ val bb1 = ByteBuffer.allocate(singleBufferSize)
+ val bb2 = ByteBuffer.allocate(singleBufferSize)
+
+ it("should be heated up") {
+ measureAverageCopyTimeInNanos(bb1, bb2)
+ }
+
+ it("should work fast") {
+ measureAndPrintAverageCopyTime("onheap buffers of $singleBufferSize bytes", bb1, bb2)
+ }
+ }
+ }
+ }
+
+}) \ No newline at end of file