aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-05-29 14:46:27 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 09:18:45 +0200
commit7bc4137767f88c2aae8f966065c5bea083234190 (patch)
tree0e46ad6fe06c817da5ba32340b3be07e829bd241
parent067c44618a7daa38c14fe42902c43b8dddd348f9 (diff)
Implemented simple tcp client
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Change-Id: Iaf913186b93eb7eebfb6f44c19d489a64ed60c2b Issue-ID: DCAEGEN2-601
-rw-r--r--hv-collector-client-simulator/pom.xml21
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt83
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt7
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt59
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt42
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt21
-rw-r--r--hv-collector-client-simulator/src/main/resources/logback.xml35
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt34
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt31
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt2
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt4
11 files changed, 298 insertions, 41 deletions
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml
index 11d287c4..caa1099b 100644
--- a/hv-collector-client-simulator/pom.xml
+++ b/hv-collector-client-simulator/pom.xml
@@ -59,6 +59,25 @@
<dependencies>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>protobuf</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -97,7 +116,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
- <scope>test</scope>
+ <scope>runtime</scope>
</dependency>
</dependencies>
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt
new file mode 100644
index 00000000..7c28edac
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt
@@ -0,0 +1,83 @@
+package org.onap.dcae.collectors.veshv.main.config
+
+import org.apache.commons.cli.Option
+import org.apache.commons.cli.Options
+import org.apache.commons.cli.DefaultParser
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.HelpFormatter
+
+
+internal object DefaultValues {
+ const val MESSAGES_AMOUNT = 1
+}
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal object ArgBasedClientConfiguration {
+
+ private val OPT_VES_PORT = Option.builder("p")
+ .longOpt("port")
+ .required()
+ .hasArg()
+ .desc("VesHvCollector port")
+ .build()
+
+ private val OPT_VES_HOST = Option.builder("h")
+ .longOpt("host")
+ .required()
+ .hasArg()
+ .desc("VesHvCollector host")
+ .build()
+
+ private val OPT_MESSAGES_AMOUNT = Option.builder("m")
+ .longOpt("messages")
+ .hasArg()
+ .desc("Amount of messages to send")
+ .build()
+
+ private val options by lazy {
+ val options = Options()
+ options.addOption(OPT_VES_PORT)
+ options.addOption(OPT_VES_HOST)
+ options.addOption(OPT_MESSAGES_AMOUNT)
+ options
+ }
+
+ fun parse(args: Array<out String>): ClientConfiguration {
+ val parser = DefaultParser()
+
+ try {
+ parser.parse(options, args).run {
+ return ClientConfiguration(
+ stringValue(OPT_VES_HOST),
+ intValue(OPT_VES_PORT),
+ intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT))
+ }
+ } catch (ex: Exception) {
+ throw WrongArgumentException(ex)
+ }
+ }
+
+ private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
+ getOptionValue(option.opt)?.toInt() ?: default
+
+ private fun CommandLine.intValue(option: Option) =
+ getOptionValue(option.opt).toInt()
+
+ private fun CommandLine.stringValue(option: Option) =
+ getOptionValue(option.opt)
+
+
+ class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) {
+ fun printMessage() {
+ println(message)
+ }
+
+ fun printHelp(programName: String) {
+ val formatter = HelpFormatter()
+ formatter.printHelp(programName, options)
+ }
+ }
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt
new file mode 100644
index 00000000..742c2869
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt
@@ -0,0 +1,7 @@
+package org.onap.dcae.collectors.veshv.main.config
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt
new file mode 100644
index 00000000..e0c53ae3
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/MessageFactory.kt
@@ -0,0 +1,59 @@
+package org.onap.dcae.collectors.veshv.main.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.ves.VesEventV5
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class MessageFactory {
+
+ companion object {
+ const val DEFAULT_START_EPOCH: Long = 120034455
+ const val DEFAULT_LAST_EPOCH: Long = 120034455
+ }
+
+ fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
+ Mono.just(createMessage()).repeat(amount.toLong())
+
+
+ private fun createMessage(): WireFrame {
+ val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
+ .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM)
+ .setStartEpochMicrosec(DEFAULT_START_EPOCH)
+ .setLastEpochMicrosec(DEFAULT_LAST_EPOCH)
+ .setSequence(2)
+ .build()
+
+ val payload = vesMessageBytes(commonHeader)
+ return WireFrame(
+ payload = payload,
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 2,
+ payloadSize = payload.readableBytes())
+
+
+ }
+
+ private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
+ val msg = VesEventV5.VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
+ .build()
+
+ return Unpooled.wrappedBuffer(msg.toByteArray())
+ }
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt
new file mode 100644
index 00000000..d23a6f7e
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt
@@ -0,0 +1,42 @@
+package org.onap.dcae.collectors.veshv.main.impl
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.ipc.netty.NettyInbound
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+import java.util.function.BiFunction
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(configuration: ClientConfiguration) {
+
+ private val logger = Logger(VesHvClient::class)
+ private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort)
+
+ fun send(messages: Flux<WireFrame>) {
+ client.start(BiFunction { i, o -> handler(i, o, messages) })
+ }
+
+ // sending flux with multiple WireFrames not supported yet
+ private fun handler(nettyInbound: NettyInbound,
+ nettyOutbound: NettyOutbound,
+ messages: Flux<WireFrame>): Publisher<Void> {
+
+ nettyInbound
+ .receive()
+ .asString(Charsets.UTF_8)
+ .subscribe { str -> logger.info("Server response: $str") }
+
+ return nettyOutbound
+ .options { it.flushOnEach() }
+ .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) })
+ }
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt
index 35710c06..137ffdff 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt
@@ -19,11 +19,26 @@
*/
package org.onap.dcae.collectors.veshv.main
+import org.onap.dcae.collectors.veshv.main.config.ArgBasedClientConfiguration
+import org.onap.dcae.collectors.veshv.main.impl.MessageFactory
+import org.onap.dcae.collectors.veshv.main.impl.VesHvClient
import org.slf4j.LoggerFactory.getLogger
private val logger = getLogger("Simulator :: main")
-fun main(args : Array<String>){
- logger.info("Hello world")
-} \ No newline at end of file
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+fun main(args: Array<String>) = try {
+
+ val clientConfig = ArgBasedClientConfiguration.parse(args)
+ val messageFactory = MessageFactory()
+ val client = VesHvClient(clientConfig)
+ client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+} catch (e: Exception) {
+ logger.error(e.localizedMessage)
+ logger.debug("An error occurred when starting ves client", e)
+}
+
diff --git a/hv-collector-client-simulator/src/main/resources/logback.xml b/hv-collector-client-simulator/src/main/resources/logback.xml
new file mode 100644
index 00000000..809f62d4
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt
deleted file mode 100644
index 770adeba..00000000
--- a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/DummyTest.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main
-
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import kotlin.test.assertEquals
-
-object DummyTest : Spek({
- on("sum of 2 and 3") {
- val sum = 2 + 3
- it("outcome should be equals 5"){
- assertEquals(5, sum)
- }
- }
-})
diff --git a/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt
new file mode 100644
index 00000000..5a89da48
--- /dev/null
+++ b/hv-collector-client-simulator/src/test/kotlin/org.onap.dcae.collectors.veshv.main/MessageFactoryTest.kt
@@ -0,0 +1,31 @@
+package org.onap.dcae.collectors.veshv.main
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.main.impl.MessageFactory
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+object WireFrameTest : Spek({
+
+ val factory = MessageFactory()
+
+
+ given("no parameters") {
+ it("should return flux with one message") {
+ val result = factory.createMessageFlux()
+
+ assertEquals(1, result.count().block())
+ }
+ }
+ given("messages amount") {
+ it("should return message flux of specified size") {
+ val result = factory.createMessageFlux(5)
+ assertEquals(5, result.count().block())
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
index ea78706e..cf484d7c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.domain
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class ServerConfiguration(val port: Int, val configurationUrl: String)
+data class ServerConfiguration( val configurationUrl: String, val port: Int)
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
index 6311b6c0..4e614cdb 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -54,8 +54,8 @@ internal object ArgBasedServerConfiguration {
try {
parser.parse(options, args).run {
return ServerConfiguration(
- intValue(OPT_PORT, DefaultValues.PORT),
- stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL))
+ stringValue(OPT_CONFIG_URL, DefaultValues.CONFIG_URL),
+ intValue(OPT_PORT, DefaultValues.PORT))
}
} catch (ex: Exception) {
throw WrongArgumentException(ex)