From 7bc4137767f88c2aae8f966065c5bea083234190 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Tue, 29 May 2018 14:46:27 +0200 Subject: Implemented simple tcp client Signed-off-by: Jakub Dudycz Change-Id: Iaf913186b93eb7eebfb6f44c19d489a64ed60c2b Issue-ID: DCAEGEN2-601 --- hv-collector-client-simulator/pom.xml | 21 +++++- .../config/ArgBasedClientConfiguration.kt | 83 ++++++++++++++++++++++ .../config/ClientConfiguration.kt | 7 ++ .../impl/MessageFactory.kt | 59 +++++++++++++++ .../impl/VesHvClient.kt | 42 +++++++++++ .../org.onap.dcae.collectors.veshv.main/main.kt | 21 +++++- .../src/main/resources/logback.xml | 35 +++++++++ .../DummyTest.kt | 34 --------- .../MessageFactoryTest.kt | 31 ++++++++ 9 files changed, 295 insertions(+), 38 deletions(-) create mode 100644 hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt create mode 100644 hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt create mode 100644 hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt create mode 100644 hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt create mode 100644 hv-collector-client-simulator/src/main/resources/logback.xml delete mode 100644 hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt create mode 100644 hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt (limited to 'hv-collector-client-simulator') diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index 11d287c4..caa1099b 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -58,6 +58,25 @@ + + ${project.parent.groupId} + protobuf + ${project.parent.version} + + + ${project.parent.groupId} + hv-collector-utils + ${project.parent.version} + + + ${project.parent.groupId} + hv-collector-core + ${project.parent.version} + + + commons-cli + commons-cli + org.slf4j slf4j-api @@ -97,7 +116,7 @@ ch.qos.logback logback-classic - test + runtime diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt new file mode 100644 index 00000000..7c28edac --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt @@ -0,0 +1,83 @@ +package org.onap.dcae.collectors.veshv.main.config + +import org.apache.commons.cli.Option +import org.apache.commons.cli.Options +import org.apache.commons.cli.DefaultParser +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.HelpFormatter + + +internal object DefaultValues { + const val MESSAGES_AMOUNT = 1 +} + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +internal object ArgBasedClientConfiguration { + + private val OPT_VES_PORT = Option.builder("p") + .longOpt("port") + .required() + .hasArg() + .desc("VesHvCollector port") + .build() + + private val OPT_VES_HOST = Option.builder("h") + .longOpt("host") + .required() + .hasArg() + .desc("VesHvCollector host") + .build() + + private val OPT_MESSAGES_AMOUNT = Option.builder("m") + .longOpt("messages") + .hasArg() + .desc("Amount of messages to send") + .build() + + private val options by lazy { + val options = Options() + options.addOption(OPT_VES_PORT) + options.addOption(OPT_VES_HOST) + options.addOption(OPT_MESSAGES_AMOUNT) + options + } + + fun parse(args: Array): ClientConfiguration { + val parser = DefaultParser() + + try { + parser.parse(options, args).run { + return ClientConfiguration( + stringValue(OPT_VES_HOST), + intValue(OPT_VES_PORT), + intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)) + } + } catch (ex: Exception) { + throw WrongArgumentException(ex) + } + } + + private fun CommandLine.intValueOrDefault(option: Option, default: Int) = + getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.intValue(option: Option) = + getOptionValue(option.opt).toInt() + + private fun CommandLine.stringValue(option: Option) = + getOptionValue(option.opt) + + + class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { + fun printMessage() { + println(message) + } + + fun printHelp(programName: String) { + val formatter = HelpFormatter() + formatter.printHelp(programName, options) + } + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt new file mode 100644 index 00000000..742c2869 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt @@ -0,0 +1,7 @@ +package org.onap.dcae.collectors.veshv.main.config + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int) diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt new file mode 100644 index 00000000..e0c53ae3 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt @@ -0,0 +1,59 @@ +package org.onap.dcae.collectors.veshv.main.impl + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.ves.VesEventV5 +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class MessageFactory { + + companion object { + const val DEFAULT_START_EPOCH: Long = 120034455 + const val DEFAULT_LAST_EPOCH: Long = 120034455 + } + + fun createMessageFlux(amount: Int = 1): Flux = + Mono.just(createMessage()).repeat(amount.toLong()) + + + private fun createMessage(): WireFrame { + val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() + .setVersion("1.9") + .setEventName("Sample event name") + .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS) + .setEventId("Sample event Id") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(DEFAULT_START_EPOCH) + .setLastEpochMicrosec(DEFAULT_LAST_EPOCH) + .setSequence(2) + .build() + + val payload = vesMessageBytes(commonHeader) + return WireFrame( + payload = payload, + mark = 0xFF, + majorVersion = 1, + minorVersion = 2, + payloadSize = payload.readableBytes()) + + + } + + private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { + val msg = VesEventV5.VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) + .build() + + return Unpooled.wrappedBuffer(msg.toByteArray()) + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt new file mode 100644 index 00000000..d23a6f7e --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt @@ -0,0 +1,42 @@ +package org.onap.dcae.collectors.veshv.main.impl + +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.ipc.netty.NettyInbound +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpClient +import java.util.function.BiFunction + + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class VesHvClient(configuration: ClientConfiguration) { + + private val logger = Logger(VesHvClient::class) + private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort) + + fun send(messages: Flux) { + client.start(BiFunction { i, o -> handler(i, o, messages) }) + } + + // sending flux with multiple WireFrames not supported yet + private fun handler(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + messages: Flux): Publisher { + + nettyInbound + .receive() + .asString(Charsets.UTF_8) + .subscribe { str -> logger.info("Server response: $str") } + + return nettyOutbound + .options { it.flushOnEach() } + .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) }) + } +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt index 35710c06..137ffdff 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt @@ -19,11 +19,26 @@ */ package org.onap.dcae.collectors.veshv.main +import org.onap.dcae.collectors.veshv.main.config.ArgBasedClientConfiguration +import org.onap.dcae.collectors.veshv.main.impl.MessageFactory +import org.onap.dcae.collectors.veshv.main.impl.VesHvClient import org.slf4j.LoggerFactory.getLogger private val logger = getLogger("Simulator :: main") -fun main(args : Array){ - logger.info("Hello world") -} \ No newline at end of file +/** + * @author Jakub Dudycz + * @since June 2018 + */ +fun main(args: Array) = try { + + val clientConfig = ArgBasedClientConfiguration.parse(args) + val messageFactory = MessageFactory() + val client = VesHvClient(clientConfig) + client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) +} catch (e: Exception) { + logger.error(e.localizedMessage) + logger.debug("An error occurred when starting ves client", e) +} + diff --git a/hv-collector-client-simulator/src/main/resources/logback.xml b/hv-collector-client-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/hv-collector-client-simulator/src/main/resources/logback.xml @@ -0,0 +1,35 @@ + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + + + + + + + ${FILE_LOG_PATTERN} + + ${LOG_FILE} + + ${LOG_FILE}.%d{yyyy-MM-dd}.log + 50MB + 30 + 10GB + + + + + + + + + + \ No newline at end of file diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt deleted file mode 100644 index 770adeba..00000000 --- a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import kotlin.test.assertEquals - -object DummyTest : Spek({ - on("sum of 2 and 3") { - val sum = 2 + 3 - it("outcome should be equals 5"){ - assertEquals(5, sum) - } - } -}) diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt new file mode 100644 index 00000000..5a89da48 --- /dev/null +++ b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt @@ -0,0 +1,31 @@ +package org.onap.dcae.collectors.veshv.main + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.main.impl.MessageFactory +import kotlin.test.assertEquals + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +object WireFrameTest : Spek({ + + val factory = MessageFactory() + + + given("no parameters") { + it("should return flux with one message") { + val result = factory.createMessageFlux() + + assertEquals(1, result.count().block()) + } + } + given("messages amount") { + it("should return message flux of specified size") { + val result = factory.createMessageFlux(5) + assertEquals(5, result.count().block()) + } + } +}) \ No newline at end of file -- cgit 1.2.3-korg