diff options
Diffstat (limited to 'hv-collector-main')
6 files changed, 447 insertions, 0 deletions
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml new file mode 100644 index 00000000..9f02ed58 --- /dev/null +++ b/hv-collector-main/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.veshv</groupId> + <artifactId>ves-hv-collector</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-main</artifactId> + <description>VES HighVolume Collector :: Main</description> + + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + </dependencies> + + +</project> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt new file mode 100644 index 00000000..6311b6c0 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.apache.commons.cli.* +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration + +internal object DefaultValues { + const val PORT = 8600 + const val CONFIG_URL = "" +} + +internal object ArgBasedServerConfiguration { + private val OPT_PORT = Option.builder("p") + .longOpt("listen-port") + .hasArg() + .desc("Listen port") + .build() + + private val OPT_CONFIG_URL = Option.builder("c") + .longOpt("config-url") + .optionalArg(true) + .hasArg() + .desc("Url of ves configuration on consul") + .build() + + private val options by lazy { + val options = Options() + options.addOption(OPT_PORT) + options.addOption(OPT_CONFIG_URL) + options + } + + fun parse(args: Array<out String>): ServerConfiguration { + val parser = DefaultParser() + + try { + parser.parse(options, args).run { + return ServerConfiguration( + intValue(OPT_PORT, DefaultValues.PORT), + stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL)) + } + } catch (ex: Exception) { + throw WrongArgumentException(ex) + } + } + + private fun CommandLine.intValue(option: Option, default: Int) = + getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.stringValue(option: Option, default: String) = + getOptionValue(option.opt) ?: default + + + class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { + fun printMessage() { + println(message) + } + + fun printHelp(programName: String) { + val formatter = HelpFormatter() + formatter.printHelp(programName, options) + } + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt new file mode 100644 index 00000000..d81a063d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.slf4j.LoggerFactory +import kotlin.system.exitProcess + +private val logger = LoggerFactory.getLogger("main") + +fun main(args: Array<String>) { + try { + val serverConfiguration = ArgBasedServerConfiguration.parse(args) + + val collectorProvider = CollectorFactory( + resolveConfigurationProvider(serverConfiguration), + AdapterFactory.kafkaSink() + ).createVesHvCollectorProvider() + ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() + } catch (ex: WrongArgumentException) { + ex.printMessage() + ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") + exitProcess(1) + } +} + + +private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider { + + if (serverConfiguration.configurationUrl.isEmpty()) { + logger.info("Configuration url not specified - using default config") + val sampleConfig = CollectorConfiguration( + kafkaBootstrapServers = "dmaap.cluster.local:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build() + ) + return AdapterFactory.staticConfigurationProvider(sampleConfig) + } + + logger.info("Using configuration url: ${serverConfiguration.configurationUrl}") + return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl) +} diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-main/src/main/resources/logback.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt new file mode 100644 index 00000000..0d2188ca --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object ArgBasedServerConfigurationTest : Spek({ + val cut = ArgBasedServerConfiguration + val configurationUrl = "http://test-address/test" + + fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + + given("all parameters are present in the long form") { + val result = parse("--listen-port", "6969", "--config-url", configurationUrl) + + it("should set proper port") { + assertThat(result.port).isEqualTo(6969) + } + + it("should set proper config url") { + assertThat(result.configurationUrl).isEqualTo(configurationUrl) + } + } + + given("all parameters are present in the short form") { + val result = parse("-p", "666", "-c", configurationUrl) + + it("should set proper port") { + assertThat(result.port).isEqualTo(666) + } + + it("should set proper config url") { + assertThat(result.configurationUrl).isEqualTo(configurationUrl) + } + } + + given("all optional parameters are absent") { + val result = parse() + + it("should set default port") { + assertThat(result.port).isEqualTo(DefaultValues.PORT) + } + + it("should set default config url") { + assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL) + } + } +})
\ No newline at end of file diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt new file mode 100644 index 00000000..b46d5a28 --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import java.nio.ByteBuffer + +fun Int.toKibibytes(): Int = this * 1024 +fun Int.toMebibytes(): Int = this * 1024 * 1024 + + +object NioBuffersTest : Spek({ + val BUFFER_SIZES = listOf(128.toKibibytes(), 512.toKibibytes(), 1.toMebibytes(), 2.toMebibytes()) + val NUMBER_OF_ITERATIONS = 100 + + fun measureCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double { + bb1.clear() + bb2.clear() + val start = System.nanoTime() + while (bb2.remaining() > 0) + bb2.putInt(bb1.getInt()) + val time = System.nanoTime() - start + val operations = bb1.capacity() / Integer.BYTES + return time.toDouble() / operations + } + + fun measureAverageCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double = + (0..NUMBER_OF_ITERATIONS).map { measureCopyTimeInNanos(bb1, bb2) }.average() + + fun measureAndPrintAverageCopyTime(message: String, bb1: ByteBuffer, bb2: ByteBuffer) { + val avg = measureAverageCopyTimeInNanos(bb1, bb2) + System.out.printf("Each putInt+getInt for %s took an average of %.1f ns%n", message, avg) + } + + for (singleBufferSize in BUFFER_SIZES) { + + describe("$singleBufferSize bytes buffers") { + describe("direct buffers") { + + val bb1 = ByteBuffer.allocateDirect(singleBufferSize) + val bb2 = ByteBuffer.allocateDirect(singleBufferSize) + + it("should be heated up") { + measureAverageCopyTimeInNanos(bb1, bb2) + } + + it("should work fast") { + measureAndPrintAverageCopyTime("direct buffers of $singleBufferSize bytes", bb1, bb2) + } + } + + describe("on-heap buffers") { + + val bb1 = ByteBuffer.allocate(singleBufferSize) + val bb2 = ByteBuffer.allocate(singleBufferSize) + + it("should be heated up") { + measureAverageCopyTimeInNanos(bb1, bb2) + } + + it("should work fast") { + measureAndPrintAverageCopyTime("onheap buffers of $singleBufferSize bytes", bb1, bb2) + } + } + } + } + +})
\ No newline at end of file |