diff options
Diffstat (limited to 'hv-collector-client-simulator')
8 files changed, 217 insertions, 51 deletions
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index 86cdeca7..c1c1f2e8 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -144,6 +144,14 @@ <artifactId>logback-classic</artifactId> <scope>runtime</scope> </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + </dependency> + <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + </dependency> </dependencies> diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index 6f53c91d..b8a4b888 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -35,7 +35,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ - internal object DefaultValues { const val MESSAGES_AMOUNT = -1L const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt new file mode 100644 index 00000000..f993f45a --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/MessageParameters.kt @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.config + +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +data class MessageParameters(val commonEventHeader: CommonEventHeader, val amount: Long = -1) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt new file mode 100644 index 00000000..bc1cff7c --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import reactor.core.publisher.Mono +import javax.json.Json +import javax.json.JsonObject + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class HttpServer(private val vesClient: VesHvClient) { + + fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable { + RatpackServer.of { + it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers) + } + }.doOnNext { it.start() } + + + private fun configureHandlers(chain: Chain) { + chain.post("simulator") { ctx -> + ctx.request.body + .map { Json.createReader(it.inputStream).readObject() } + .map { extractMessageParameters(it) } + .map { MessageFactory.createMessageFlux(it) } + .onError { handleException(it, ctx) } + .then { + vesClient.send(it) + ctx.response + .status(STATUS_OK) + .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() + .add("response", "Request accepted") + .build() + .toString()) + } + } + } + + private fun handleException(t: Throwable, ctx: Context) { + logger.warn("Failed to process the request - ${t.localizedMessage}") + logger.debug("Exception thrown when processing the request", t) + ctx.response + .status(STATUS_BAD_REQUEST) + .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() + .add("response", "Request was not accepted") + .add("exception", t.localizedMessage) + .build() + .toString()) + } + + private fun extractMessageParameters(request: JsonObject): MessageParameters = + try { + val commonEventHeader = MessageFactory + .parseCommonHeader(request.getJsonObject("commonEventHeader")) + val messagesAmount = request.getJsonNumber("messagesAmount").longValue() + MessageParameters(commonEventHeader, messagesAmount) + } catch (e: Exception) { + throw ValidationException("Validating request body failed", e) + } + + + companion object { + private val logger = Logger(HttpServer::class) + const val DEFAULT_PORT = 5000 + const val STATUS_OK = 200 + const val STATUS_BAD_REQUEST = 400 + const val CONTENT_TYPE_APPLICATION_JSON = "application/json" + } +} + +internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index 87a238a8..60117603 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -21,50 +21,53 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.ves.VesEventV5 +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import javax.json.JsonObject /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class MessageFactory { +object MessageFactory { - companion object { - const val DEFAULT_START_EPOCH: Long = 120034455 - const val DEFAULT_LAST_EPOCH: Long = 120034455 - } - fun createMessageFlux(amount: Long = -1): Flux<WireFrame> = - Mono.fromCallable(this::createMessage).let { - if (amount < 0) + fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = + Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { + if (messageParameters.amount < 0) it.repeat() else - it.repeat(amount) + it.repeat(messageParameters.amount) } + fun parseCommonHeader(json: JsonObject): CommonEventHeader = CommonEventHeader.newBuilder() + .setVersion(json.getString("version")) + .setDomain(CommonEventHeader.Domain.forNumber(json.getInt("domain"))) + .setSequence(json.getInt("sequence")) + .setPriority(CommonEventHeader.Priority.forNumber(json.getInt("priority"))) + .setEventId(json.getString("eventId")) + .setEventName(json.getString("eventName")) + .setEventType(json.getString("eventType")) + .setStartEpochMicrosec(json.getJsonNumber("startEpochMicrosec").longValue()) + .setLastEpochMicrosec(json.getJsonNumber("lastEpochMicrosec").longValue()) + .setNfNamingCode(json.getString("nfNamingCode")) + .setNfcNamingCode(json.getString("nfcNamingCode")) + .setReportingEntityId(json.getString("reportingEntityId")) + .setReportingEntityName(ByteString.copyFromUtf8(json.getString("reportingEntityName"))) + .setSourceId(ByteString.copyFromUtf8(json.getString("sourceId"))) + .setSourceName(json.getString("sourceName")) + .build() - private fun createMessage(): WireFrame { - val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() - .setVersion("1.9") - .setEventName("Sample event name") - .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS) - .setEventId("Sample event Id") - .setSourceName("Sample Source") - .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) - .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM) - .setStartEpochMicrosec(DEFAULT_START_EPOCH) - .setLastEpochMicrosec(DEFAULT_LAST_EPOCH) - .setSequence(2) - .build() - val payload = vesMessageBytes(commonHeader) - return WireFrame(payload) - } + private fun createMessage(commonHeader: CommonEventHeader): WireFrame = + WireFrame(vesMessageBytes(commonHeader)) + - private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray { - val msg = VesEventV5.VesEvent.newBuilder() + private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray { + val msg = VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 13256c52..78a72d93 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -32,17 +32,15 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient -import java.util.function.BiFunction /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class VesHvClient(configuration: ClientConfiguration) { +class VesHvClient(private val configuration: ClientConfiguration) { private val client: TcpClient = TcpClient.builder() .options { opts -> @@ -52,18 +50,17 @@ class VesHvClient(configuration: ClientConfiguration) { } .build() - fun send(messages: Flux<WireFrame>) { - client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) }) - } + fun send(messages: Flux<WireFrame>) = + client + .newHandler { _, out -> handler(out, messages) } + .doOnError{logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}")} + .subscribe { logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") } + - private fun handler(nettyInbound: NettyInbound, - nettyOutbound: NettyOutbound, - messages: Flux<WireFrame>): Publisher<Void> { + private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> { - nettyInbound - .receive() - .asString(Charsets.UTF_8) - .subscribe { str -> logger.info("Server response: $str") } val encoder = WireFrameEncoder(nettyOutbound.alloc()) @@ -74,6 +71,7 @@ class VesHvClient(configuration: ClientConfiguration) { return nettyOutbound .options { it.flushOnEach() } .send(frames) + .then { logger.info("Messages have been sent") } } private fun createSslContext(config: SecurityConfiguration): SslContext = diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 3fa023bf..08bc6a71 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException import org.slf4j.LoggerFactory.getLogger @@ -35,9 +35,12 @@ private val logger = getLogger("Simulator :: main") fun main(args: Array<String>) { try { val clientConfig = ArgBasedClientConfiguration().parse(args) - val messageFactory = MessageFactory() - val client = VesHvClient(clientConfig) - client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) + val vesClient = VesHvClient(clientConfig) + + HttpServer(vesClient) + .start() + .block() + } catch (e: WrongArgumentException) { e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") System.exit(1) diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt index 405a15eb..ee1d1cf2 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt @@ -19,34 +19,65 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import com.google.protobuf.ByteString import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import org.onap.ves.VesEventV5 +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM import reactor.test.test +const val SAMPLE_START_EPOCH: Long = 120034455 +const val SAMPLE_LAST_EPOCH: Long = 120034455 + /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ object MessageFactoryTest : Spek({ describe("message factory") { - val factory = MessageFactory() - given("no parameters") { + val factory = MessageFactory + + given("only common header") { it("should return infinite flux") { val limit = 1000L - factory.createMessageFlux().take(limit).test() + factory.createMessageFlux(getSampleMessageParameters()).take(limit).test() .expectNextCount(limit) .verifyComplete() } } - given("messages amount") { + given("common header and messages amount") { it("should return message flux of specified size") { - factory.createMessageFlux(5).test() + factory.createMessageFlux((getSampleMessageParameters(5))).test() .expectNextCount(5) .verifyComplete() } } } }) + +fun getSampleMessageParameters(amount: Long = -1): MessageParameters{ + val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() + .setVersion("sample-version") + .setDomain(HVRANMEAS) + .setSequence(1) + .setPriority(MEDIUM) + .setEventId("sample-event-id") + .setEventName("sample-event-name") + .setEventType("sample-event-type") + .setStartEpochMicrosec(SAMPLE_START_EPOCH) + .setLastEpochMicrosec(SAMPLE_LAST_EPOCH) + .setNfNamingCode("sample-nf-naming-code") + .setNfcNamingCode("sample-nfc-naming-code") + .setReportingEntityId("sample-reporting-entity-id") + .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setSourceId(ByteString.copyFromUtf8("sample-source-id")) + .setSourceName("sample-source-name") + .build() + + return MessageParameters(commonHeader, amount) +} |