diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-05-29 14:46:27 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 09:18:45 +0200 |
commit | 7bc4137767f88c2aae8f966065c5bea083234190 (patch) | |
tree | 0e46ad6fe06c817da5ba32340b3be07e829bd241 | |
parent | 067c44618a7daa38c14fe42902c43b8dddd348f9 (diff) |
Implemented simple tcp client
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Change-Id: Iaf913186b93eb7eebfb6f44c19d489a64ed60c2b
Issue-ID: DCAEGEN2-601
11 files changed, 298 insertions, 41 deletions
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index 11d287c4..caa1099b 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -59,6 +59,25 @@ <dependencies> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>protobuf</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -97,7 +116,7 @@ <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> - <scope>test</scope> + <scope>runtime</scope> </dependency> </dependencies> diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt new file mode 100644 index 00000000..7c28edac --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt @@ -0,0 +1,83 @@ +package org.onap.dcae.collectors.veshv.main.config + +import org.apache.commons.cli.Option +import org.apache.commons.cli.Options +import org.apache.commons.cli.DefaultParser +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.HelpFormatter + + +internal object DefaultValues { + const val MESSAGES_AMOUNT = 1 +} + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +internal object ArgBasedClientConfiguration { + + private val OPT_VES_PORT = Option.builder("p") + .longOpt("port") + .required() + .hasArg() + .desc("VesHvCollector port") + .build() + + private val OPT_VES_HOST = Option.builder("h") + .longOpt("host") + .required() + .hasArg() + .desc("VesHvCollector host") + .build() + + private val OPT_MESSAGES_AMOUNT = Option.builder("m") + .longOpt("messages") + .hasArg() + .desc("Amount of messages to send") + .build() + + private val options by lazy { + val options = Options() + options.addOption(OPT_VES_PORT) + options.addOption(OPT_VES_HOST) + options.addOption(OPT_MESSAGES_AMOUNT) + options + } + + fun parse(args: Array<out String>): ClientConfiguration { + val parser = DefaultParser() + + try { + parser.parse(options, args).run { + return ClientConfiguration( + stringValue(OPT_VES_HOST), + intValue(OPT_VES_PORT), + intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)) + } + } catch (ex: Exception) { + throw WrongArgumentException(ex) + } + } + + private fun CommandLine.intValueOrDefault(option: Option, default: Int) = + getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.intValue(option: Option) = + getOptionValue(option.opt).toInt() + + private fun CommandLine.stringValue(option: Option) = + getOptionValue(option.opt) + + + class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { + fun printMessage() { + println(message) + } + + fun printHelp(programName: String) { + val formatter = HelpFormatter() + formatter.printHelp(programName, options) + } + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt new file mode 100644 index 00000000..742c2869 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt @@ -0,0 +1,7 @@ +package org.onap.dcae.collectors.veshv.main.config + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int) diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt new file mode 100644 index 00000000..e0c53ae3 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt @@ -0,0 +1,59 @@ +package org.onap.dcae.collectors.veshv.main.impl + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.ves.VesEventV5 +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class MessageFactory { + + companion object { + const val DEFAULT_START_EPOCH: Long = 120034455 + const val DEFAULT_LAST_EPOCH: Long = 120034455 + } + + fun createMessageFlux(amount: Int = 1): Flux<WireFrame> = + Mono.just(createMessage()).repeat(amount.toLong()) + + + private fun createMessage(): WireFrame { + val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() + .setVersion("1.9") + .setEventName("Sample event name") + .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS) + .setEventId("Sample event Id") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(DEFAULT_START_EPOCH) + .setLastEpochMicrosec(DEFAULT_LAST_EPOCH) + .setSequence(2) + .build() + + val payload = vesMessageBytes(commonHeader) + return WireFrame( + payload = payload, + mark = 0xFF, + majorVersion = 1, + minorVersion = 2, + payloadSize = payload.readableBytes()) + + + } + + private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { + val msg = VesEventV5.VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) + .build() + + return Unpooled.wrappedBuffer(msg.toByteArray()) + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt new file mode 100644 index 00000000..d23a6f7e --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt @@ -0,0 +1,42 @@ +package org.onap.dcae.collectors.veshv.main.impl + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.ipc.netty.NettyInbound +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpClient +import java.util.function.BiFunction + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class VesHvClient(configuration: ClientConfiguration) { + + private val logger = Logger(VesHvClient::class) + private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort) + + fun send(messages: Flux<WireFrame>) { + client.start(BiFunction { i, o -> handler(i, o, messages) }) + } + + // sending flux with multiple WireFrames not supported yet + private fun handler(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + messages: Flux<WireFrame>): Publisher<Void> { + + nettyInbound + .receive() + .asString(Charsets.UTF_8) + .subscribe { str -> logger.info("Server response: $str") } + + return nettyOutbound + .options { it.flushOnEach() } + .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) }) + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt index 35710c06..137ffdff 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt @@ -19,11 +19,26 @@ */ package org.onap.dcae.collectors.veshv.main +import org.onap.dcae.collectors.veshv.main.config.ArgBasedClientConfiguration +import org.onap.dcae.collectors.veshv.main.impl.MessageFactory +import org.onap.dcae.collectors.veshv.main.impl.VesHvClient import org.slf4j.LoggerFactory.getLogger private val logger = getLogger("Simulator :: main") -fun main(args : Array<String>){ - logger.info("Hello world") -}
\ No newline at end of file +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +fun main(args: Array<String>) = try { + + val clientConfig = ArgBasedClientConfiguration.parse(args) + val messageFactory = MessageFactory() + val client = VesHvClient(clientConfig) + client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) +} catch (e: Exception) { + logger.error(e.localizedMessage) + logger.debug("An error occurred when starting ves client", e) +} + diff --git a/hv-collector-client-simulator/src/main/resources/logback.xml b/hv-collector-client-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-client-simulator/src/main/resources/logback.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt deleted file mode 100644 index 770adeba..00000000 --- a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import kotlin.test.assertEquals - -object DummyTest : Spek({ - on("sum of 2 and 3") { - val sum = 2 + 3 - it("outcome should be equals 5"){ - assertEquals(5, sum) - } - } -}) diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt new file mode 100644 index 00000000..5a89da48 --- /dev/null +++ b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt @@ -0,0 +1,31 @@ +package org.onap.dcae.collectors.veshv.main + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.main.impl.MessageFactory +import kotlin.test.assertEquals + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +object WireFrameTest : Spek({ + + val factory = MessageFactory() + + + given("no parameters") { + it("should return flux with one message") { + val result = factory.createMessageFlux() + + assertEquals(1, result.count().block()) + } + } + given("messages amount") { + it("should return message flux of specified size") { + val result = factory.createMessageFlux(5) + assertEquals(5, result.count().block()) + } + } +})
\ No newline at end of file diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt index ea78706e..cf484d7c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt @@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.domain * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class ServerConfiguration(val port: Int, val configurationUrl: String) +data class ServerConfiguration( val configurationUrl: String, val port: Int) diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt index 6311b6c0..4e614cdb 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -54,8 +54,8 @@ internal object ArgBasedServerConfiguration { try { parser.parse(options, args).run { return ServerConfiguration( - intValue(OPT_PORT, DefaultValues.PORT), - stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL)) + stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL), + intValue(OPT_PORT, DefaultValues.PORT)) } } catch (ex: Exception) { throw WrongArgumentException(ex) |