From e98fdcc3087d06b76066ae2d2c7d0bde41d7776b Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 26 Apr 2018 09:17:09 +0200 Subject: HV VES Collector seed code Contains squashed commits up to 11fe6b63 (2018-05-30). The whole contains a basic project structure. We are trying to put rest of the commits one by one so we do not loose the history. Bellow there are messages of the single commits in this squashed bulk: Basic project setup Create base maven project with Gitlab CI configuration. Piotr Jaszczyk Merging guildeline Piotr Jaszczyk Add remote branch delete command Piotr Jaszczyk Sample runtime in Kotlin - PoC Piotr Jaszczyk Setup project internal architecture Piotr Jaszczyk Message routing Determine target topic and partition by VES Common Header. Piotr Jaszczyk Parse GPB message header fkrzywka Set listen port based on command line args Use Apache Commons CLI to parse cmd line args. Piotr Jaszczyk Drop invalid GPB messages Instead of propagating error and closing stream just drop the message and proceed. Final handling logic may include closing the connection or sending some message depending on the specification. Piotr Jaszczyk Add Apache license file Piotr Jaszczyk Convert to maven multi-module project fkrzywka Component tests with current GPB schema * Using v5 draft protobuf definition * Code reorganized to so component boundaries are more visible Piotr Jaszczyk Thin logging facade over slf4j Piotr Jaszczyk Introduce code analysis tools Piotr Jaszczyk Implemented reading configuration from consul Ves Common Header validation added (required parameters existance check) Micro benchmark for direct vs on-heap NIO buffers Piotr Jaszczyk Decode wire protocol and fix (most?) memory leaks Proposed wire protocol is just a suggestion and will (should) change in the future. Netty's ByteBuf is a reference-counted wrapper over a memory chunk. It is crucial to free unused buffers by means of release() method. The general rule regarding memory management was suggested. Let's put all memory-cleanup logic in main VesHvCollector class so other classes could focus on their job. Piotr Jaszczyk Minor cleanup Piotr Jaszczyk Add license info in files Piotr Jaszczyk Change-Id: Ic484aa107eba48ad48f8ab222799e1795dffa865 Issue-ID: DCAEGEN2-601 Signed-off-by: Piotr Jaszczyk --- .../veshv/main/ArgBasedServerConfiguration.kt | 82 ++++++++++++++++++++ .../org/onap/dcae/collectors/veshv/main/main.kt | 72 ++++++++++++++++++ hv-collector-main/src/main/resources/logback.xml | 35 +++++++++ .../veshv/main/ArgBasedServerConfigurationTest.kt | 72 ++++++++++++++++++ .../dcae/collectors/veshv/main/NioBuffersTest.kt | 87 ++++++++++++++++++++++ 5 files changed, 348 insertions(+) create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt create mode 100644 hv-collector-main/src/main/resources/logback.xml create mode 100644 hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt create mode 100644 hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt (limited to 'hv-collector-main/src') diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt new file mode 100644 index 00000000..6311b6c0 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.apache.commons.cli.* +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration + +internal object DefaultValues { + const val PORT = 8600 + const val CONFIG_URL = "" +} + +internal object ArgBasedServerConfiguration { + private val OPT_PORT = Option.builder("p") + .longOpt("listen-port") + .hasArg() + .desc("Listen port") + .build() + + private val OPT_CONFIG_URL = Option.builder("c") + .longOpt("config-url") + .optionalArg(true) + .hasArg() + .desc("Url of ves configuration on consul") + .build() + + private val options by lazy { + val options = Options() + options.addOption(OPT_PORT) + options.addOption(OPT_CONFIG_URL) + options + } + + fun parse(args: Array): ServerConfiguration { + val parser = DefaultParser() + + try { + parser.parse(options, args).run { + return ServerConfiguration( + intValue(OPT_PORT, DefaultValues.PORT), + stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL)) + } + } catch (ex: Exception) { + throw WrongArgumentException(ex) + } + } + + private fun CommandLine.intValue(option: Option, default: Int) = + getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.stringValue(option: Option, default: String) = + getOptionValue(option.opt) ?: default + + + class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { + fun printMessage() { + println(message) + } + + fun printHelp(programName: String) { + val formatter = HelpFormatter() + formatter.printHelp(programName, options) + } + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt new file mode 100644 index 00000000..d81a063d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.ServerConfiguration +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.slf4j.LoggerFactory +import kotlin.system.exitProcess + +private val logger = LoggerFactory.getLogger("main") + +fun main(args: Array) { + try { + val serverConfiguration = ArgBasedServerConfiguration.parse(args) + + val collectorProvider = CollectorFactory( + resolveConfigurationProvider(serverConfiguration), + AdapterFactory.kafkaSink() + ).createVesHvCollectorProvider() + ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() + } catch (ex: WrongArgumentException) { + ex.printMessage() + ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") + exitProcess(1) + } +} + + +private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider { + + if (serverConfiguration.configurationUrl.isEmpty()) { + logger.info("Configuration url not specified - using default config") + val sampleConfig = CollectorConfiguration( + kafkaBootstrapServers = "dmaap.cluster.local:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build() + ) + return AdapterFactory.staticConfigurationProvider(sampleConfig) + } + + logger.info("Using configuration url: ${serverConfiguration.configurationUrl}") + return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl) +} diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-main/src/main/resources/logback.xml @@ -0,0 +1,35 @@ + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + + + + + + + ${FILE_LOG_PATTERN} + + ${LOG_FILE} + + ${LOG_FILE}.%d{yyyy-MM-dd}.log + 50MB + 30 + 10GB + + + + + + + + + + \ No newline at end of file diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt new file mode 100644 index 00000000..0d2188ca --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +object ArgBasedServerConfigurationTest : Spek({ + val cut = ArgBasedServerConfiguration + val configurationUrl = "http://test-address/test" + + fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + + given("all parameters are present in the long form") { + val result = parse("--listen-port", "6969", "--config-url", configurationUrl) + + it("should set proper port") { + assertThat(result.port).isEqualTo(6969) + } + + it("should set proper config url") { + assertThat(result.configurationUrl).isEqualTo(configurationUrl) + } + } + + given("all parameters are present in the short form") { + val result = parse("-p", "666", "-c", configurationUrl) + + it("should set proper port") { + assertThat(result.port).isEqualTo(666) + } + + it("should set proper config url") { + assertThat(result.configurationUrl).isEqualTo(configurationUrl) + } + } + + given("all optional parameters are absent") { + val result = parse() + + it("should set default port") { + assertThat(result.port).isEqualTo(DefaultValues.PORT) + } + + it("should set default config url") { + assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL) + } + } +}) \ No newline at end of file diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt new file mode 100644 index 00000000..b46d5a28 --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/NioBuffersTest.kt @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import java.nio.ByteBuffer + +fun Int.toKibibytes(): Int = this * 1024 +fun Int.toMebibytes(): Int = this * 1024 * 1024 + + +object NioBuffersTest : Spek({ + val BUFFER_SIZES = listOf(128.toKibibytes(), 512.toKibibytes(), 1.toMebibytes(), 2.toMebibytes()) + val NUMBER_OF_ITERATIONS = 100 + + fun measureCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double { + bb1.clear() + bb2.clear() + val start = System.nanoTime() + while (bb2.remaining() > 0) + bb2.putInt(bb1.getInt()) + val time = System.nanoTime() - start + val operations = bb1.capacity() / Integer.BYTES + return time.toDouble() / operations + } + + fun measureAverageCopyTimeInNanos(bb1: ByteBuffer, bb2: ByteBuffer): Double = + (0..NUMBER_OF_ITERATIONS).map { measureCopyTimeInNanos(bb1, bb2) }.average() + + fun measureAndPrintAverageCopyTime(message: String, bb1: ByteBuffer, bb2: ByteBuffer) { + val avg = measureAverageCopyTimeInNanos(bb1, bb2) + System.out.printf("Each putInt+getInt for %s took an average of %.1f ns%n", message, avg) + } + + for (singleBufferSize in BUFFER_SIZES) { + + describe("$singleBufferSize bytes buffers") { + describe("direct buffers") { + + val bb1 = ByteBuffer.allocateDirect(singleBufferSize) + val bb2 = ByteBuffer.allocateDirect(singleBufferSize) + + it("should be heated up") { + measureAverageCopyTimeInNanos(bb1, bb2) + } + + it("should work fast") { + measureAndPrintAverageCopyTime("direct buffers of $singleBufferSize bytes", bb1, bb2) + } + } + + describe("on-heap buffers") { + + val bb1 = ByteBuffer.allocate(singleBufferSize) + val bb2 = ByteBuffer.allocate(singleBufferSize) + + it("should be heated up") { + measureAverageCopyTimeInNanos(bb1, bb2) + } + + it("should work fast") { + measureAndPrintAverageCopyTime("onheap buffers of $singleBufferSize bytes", bb1, bb2) + } + } + } + } + +}) \ No newline at end of file -- cgit 1.2.3-korg