aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-client-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-client-simulator')
-rw-r--r--hv-collector-client-simulator/pom.xml8
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt1
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt28
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt96
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt59
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt24
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt11
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt41
8 files changed, 217 insertions, 51 deletions
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml
index 86cdeca7..c1c1f2e8 100644
--- a/hv-collector-client-simulator/pom.xml
+++ b/hv-collector-client-simulator/pom.xml
@@ -144,6 +144,14 @@
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ </dependency>
</dependencies>
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
index 6f53c91d..b8a4b888 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
@@ -35,7 +35,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-
internal object DefaultValues {
const val MESSAGES_AMOUNT = -1L
const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt
new file mode 100644
index 00000000..f993f45a
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.config
+
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
new file mode 100644
index 00000000..bc1cff7c
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import javax.json.Json
+import javax.json.JsonObject
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HttpServer(private val vesClient: VesHvClient) {
+
+ fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
+ RatpackServer.of {
+ it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+ }
+ }.doOnNext { it.start() }
+
+
+ private fun configureHandlers(chain: Chain) {
+ chain.post("simulator") { ctx ->
+ ctx.request.body
+ .map { Json.createReader(it.inputStream).readObject() }
+ .map { extractMessageParameters(it) }
+ .map { MessageFactory.createMessageFlux(it) }
+ .onError { handleException(it, ctx) }
+ .then {
+ vesClient.send(it)
+ ctx.response
+ .status(STATUS_OK)
+ .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+ .add("response", "Request accepted")
+ .build()
+ .toString())
+ }
+ }
+ }
+
+ private fun handleException(t: Throwable, ctx: Context) {
+ logger.warn("Failed to process the request - ${t.localizedMessage}")
+ logger.debug("Exception thrown when processing the request", t)
+ ctx.response
+ .status(STATUS_BAD_REQUEST)
+ .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+ .add("response", "Request was not accepted")
+ .add("exception", t.localizedMessage)
+ .build()
+ .toString())
+ }
+
+ private fun extractMessageParameters(request: JsonObject): MessageParameters =
+ try {
+ val commonEventHeader = MessageFactory
+ .parseCommonHeader(request.getJsonObject("commonEventHeader"))
+ val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
+ MessageParameters(commonEventHeader, messagesAmount)
+ } catch (e: Exception) {
+ throw ValidationException("Validating request body failed", e)
+ }
+
+
+ companion object {
+ private val logger = Logger(HttpServer::class)
+ const val DEFAULT_PORT = 5000
+ const val STATUS_OK = 200
+ const val STATUS_BAD_REQUEST = 400
+ const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
+ }
+}
+
+internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
index 87a238a8..60117603 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
@@ -21,50 +21,53 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import javax.json.JsonObject
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class MessageFactory {
+object MessageFactory {
- companion object {
- const val DEFAULT_START_EPOCH: Long = 120034455
- const val DEFAULT_LAST_EPOCH: Long = 120034455
- }
- fun createMessageFlux(amount: Long = -1): Flux<WireFrame> =
- Mono.fromCallable(this::createMessage).let {
- if (amount < 0)
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
+ if (messageParameters.amount < 0)
it.repeat()
else
- it.repeat(amount)
+ it.repeat(messageParameters.amount)
}
+ fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder()
+ .setVersion(json.getString("version"))
+ .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain")))
+ .setSequence(json.getInt("sequence"))
+ .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority")))
+ .setEventId(json.getString("eventId"))
+ .setEventName(json.getString("eventName"))
+ .setEventType(json.getString("eventType"))
+ .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue())
+ .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue())
+ .setNfNamingCode(json.getString("nfNamingCode"))
+ .setNfcNamingCode(json.getString("nfcNamingCode"))
+ .setReportingEntityId(json.getString("reportingEntityId"))
+ .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName")))
+ .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId")))
+ .setSourceName(json.getString("sourceName"))
+ .build()
- private fun createMessage(): WireFrame {
- val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
- .setVersion("1.9")
- .setEventName("Sample event name")
- .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
- .setEventId("Sample event Id")
- .setSourceName("Sample Source")
- .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
- .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM)
- .setStartEpochMicrosec(DEFAULT_START_EPOCH)
- .setLastEpochMicrosec(DEFAULT_LAST_EPOCH)
- .setSequence(2)
- .build()
- val payload = vesMessageBytes(commonHeader)
- return WireFrame(payload)
- }
+ private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
+ WireFrame(vesMessageBytes(commonHeader))
+
- private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray {
- val msg = VesEventV5.VesEvent.newBuilder()
+ private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray {
+ val msg = VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
.build()
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 13256c52..78a72d93 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -32,17 +32,15 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
-import java.util.function.BiFunction
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class VesHvClient(configuration: ClientConfiguration) {
+class VesHvClient(private val configuration: ClientConfiguration) {
private val client: TcpClient = TcpClient.builder()
.options { opts ->
@@ -52,18 +50,17 @@ class VesHvClient(configuration: ClientConfiguration) {
}
.build()
- fun send(messages: Flux<WireFrame>) {
- client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
- }
+ fun send(messages: Flux<WireFrame>) =
+ client
+ .newHandler { _, out -> handler(out, messages) }
+ .doOnError{logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")}
+ .subscribe { logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}") }
+
- private fun handler(nettyInbound: NettyInbound,
- nettyOutbound: NettyOutbound,
- messages: Flux<WireFrame>): Publisher<Void> {
+ private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
- nettyInbound
- .receive()
- .asString(Charsets.UTF_8)
- .subscribe { str -> logger.info("Server response: $str") }
val encoder = WireFrameEncoder(nettyOutbound.alloc())
@@ -74,6 +71,7 @@ class VesHvClient(configuration: ClientConfiguration) {
return nettyOutbound
.options { it.flushOnEach() }
.send(frames)
+ .then { logger.info("Messages have been sent") }
}
private fun createSslContext(config: SecurityConfiguration): SslContext =
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 3fa023bf..08bc6a71 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
import org.slf4j.LoggerFactory.getLogger
@@ -35,9 +35,12 @@ private val logger = getLogger("Simulator :: main")
fun main(args: Array<String>) {
try {
val clientConfig = ArgBasedClientConfiguration().parse(args)
- val messageFactory = MessageFactory()
- val client = VesHvClient(clientConfig)
- client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+ val vesClient = VesHvClient(clientConfig)
+
+ HttpServer(vesClient)
+ .start()
+ .block()
+
} catch (e: WrongArgumentException) {
e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
System.exit(1)
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
index 405a15eb..ee1d1cf2 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
@@ -19,34 +19,65 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import com.google.protobuf.ByteString
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM
import reactor.test.test
+const val SAMPLE_START_EPOCH: Long = 120034455
+const val SAMPLE_LAST_EPOCH: Long = 120034455
+
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
object MessageFactoryTest : Spek({
describe("message factory") {
- val factory = MessageFactory()
- given("no parameters") {
+ val factory = MessageFactory
+
+ given("only common header") {
it("should return infinite flux") {
val limit = 1000L
- factory.createMessageFlux().take(limit).test()
+ factory.createMessageFlux(getSampleMessageParameters()).take(limit).test()
.expectNextCount(limit)
.verifyComplete()
}
}
- given("messages amount") {
+ given("common header and messages amount") {
it("should return message flux of specified size") {
- factory.createMessageFlux(5).test()
+ factory.createMessageFlux((getSampleMessageParameters(5))).test()
.expectNextCount(5)
.verifyComplete()
}
}
}
})
+
+fun getSampleMessageParameters(amount: Long = -1): MessageParameters{
+ val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+ .setVersion("sample-version")
+ .setDomain(HVRANMEAS)
+ .setSequence(1)
+ .setPriority(MEDIUM)
+ .setEventId("sample-event-id")
+ .setEventName("sample-event-name")
+ .setEventType("sample-event-type")
+ .setStartEpochMicrosec(SAMPLE_START_EPOCH)
+ .setLastEpochMicrosec(SAMPLE_LAST_EPOCH)
+ .setNfNamingCode("sample-nf-naming-code")
+ .setNfcNamingCode("sample-nfc-naming-code")
+ .setReportingEntityId("sample-reporting-entity-id")
+ .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
+ .setSourceName("sample-source-name")
+ .build()
+
+ return MessageParameters(commonHeader, amount)
+}