diff options
38 files changed, 1037 insertions, 321 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 33aedeca..f9f52b4e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,7 @@ services: command: ["-server", "-bootstrap", "-ui-dir", "/ui"] ves-hv-collector: - image: nexus3.onap.org:10003/onap/ves-hv-collector:latest + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest # build: # context: hv-collector-main # dockerfile: Dockerfile @@ -51,7 +51,7 @@ services: - ./ssl/:/etc/ves-hv/ xnf-simulator: - image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator # build: # context: hv-collector-xnf-simulator # dockerfile: Dockerfile @@ -64,7 +64,7 @@ services: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: - image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator + image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator # build: # context: hv-collector-dcae-app-simulator # dockerfile: Dockerfile diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 1e22d4c0..ba29844a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -149,7 +149,7 @@ object PerformanceSpecification : Spek({ val outputDigest = digest.digest() - assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize) assertThat(outputDigest).isEqualTo(inputDigest) } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 239f7102..354edaeb 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -54,10 +54,14 @@ class MessageStreamValidation( val expectations = Json.createReader(input).readArray() val messageParams = messageParametersParser.parse(expectations) - if (messageParams.isEmpty()) - throw IllegalArgumentException("Message param list cannot be empty") - - return messageParams + return messageParams.fold( + { throw IllegalArgumentException("Parsing error: " + it.message) }, + { + if (it.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + it + } + ) } private fun shouldValidatePayloads(parameters: List<MessageParameters>) = diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 6c830b9d..1eca9317 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -19,18 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import arrow.core.Left -import arrow.core.Right import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.monad -import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import ratpack.exec.Promise +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendOrError import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.http.Response import ratpack.server.RatpackServer import ratpack.server.ServerConfig @@ -38,7 +34,21 @@ import ratpack.server.ServerConfig * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class ApiServer(private val simulator: DcaeAppSimulator) { +class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = "validation succeeded" + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = "validation failed", + httpStatus = HttpStatus.BAD_REQUEST + ) + } fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = @@ -52,10 +62,10 @@ class ApiServer(private val simulator: DcaeAppSimulator) { private fun setupHandlers(chain: Chain) { chain .put("configuration/topics") { ctx -> - val operation = ctx.bodyIo().flatMap { body -> - simulator.listenToTopics(body.text) + ctx.request.body.then { body -> + val operation = simulator.listenToTopics(body.text) + ctx.response.sendOrError(operation) } - ctx.response.sendOrError(operation) } .delete("messages") { ctx -> @@ -64,7 +74,7 @@ class ApiServer(private val simulator: DcaeAppSimulator) { } .get("messages/all/count") { ctx -> simulator.state().fold( - { ctx.response.status(STATUS_NOT_FOUND) }, + { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) }, { ctx.response .contentType(CONTENT_TEXT) @@ -72,58 +82,20 @@ class ApiServer(private val simulator: DcaeAppSimulator) { }) } .post("messages/all/validate") { ctx -> - val responseStatus = IO.monad().binding { - val body = ctx.bodyIo().bind() - val isValid = simulator.validate(body.inputStream).bind() - if (isValid) - STATUS_OK - else - STATUS_BAD_REQUEST - }.fix() - - ctx.response.sendStatusOrError(responseStatus) + ctx.request.body.then { body -> + val response = simulator.validate(body.inputStream) + .map { isValid -> + if (isValid) responseValid else responseInvalid + } + ctx.response.sendAndHandleErrors(response) + } } .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() + ctx.response.status(HttpConstants.STATUS_OK).send() } } - private fun Context.bodyIo() = request.body.asIo() - - private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult -> - onError { - emitResult(Left(it)) - }.then { result -> - emitResult(Right(result)) - } - } - - private fun Response.sendOrError(responseStatus: IO<Unit>) { - sendStatusOrError(responseStatus.map { STATUS_OK }) - } - - private fun Response.sendStatusOrError(responseStatus: IO<Int>) { - responseStatus.unsafeRunAsync { cb -> - cb.fold( - { err -> - logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err) - status(ApiServer.STATUS_INTERNAL_SERVER_ERROR) - .send(CONTENT_TEXT, err.message) - }, - { - status(it).send() - } - ) - } - } - companion object { - private val logger = Logger(ApiServer::class) private const val CONTENT_TEXT = "text/plain" - - private const val STATUS_OK = 200 - private const val STATUS_BAD_REQUEST = 400 - private const val STATUS_NOT_FOUND = 404 - private const val STATUS_INTERNAL_SERVER_ERROR = 500 } } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index a65a2686..c0f8b340 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -24,11 +24,10 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.void +import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -52,7 +51,7 @@ fun main(args: Array<String>) = private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) + return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) .start(config.apiPort, config.kafkaTopics) - .void() + .unit() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 0bdd1159..2932367b 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.Either import arrow.core.Left import arrow.core.None +import arrow.core.Right import arrow.core.Some import arrow.effects.IO import javax.json.stream.JsonParsingException @@ -67,7 +68,7 @@ internal class MessageStreamValidationTest : Spek({ } fun givenParsedMessageParameters(vararg params: MessageParameters) { - whenever(messageParametersParser.parse(any())).thenReturn(params.toList()) + whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList())) } describe("validate") { diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt index 79fc9321..1adf0cad 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.api -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -enum class HealthStatus(val httpResponseStatus: Int) { - UP(OK), - DOWN(SERVICE_UNAVAILABLE), - OUT_OF_SERVICE(SERVICE_UNAVAILABLE), - UNKNOWN(SERVICE_UNAVAILABLE) +enum class HealthStatus(val httpResponseStatus: HttpStatus) { + UP(HttpStatus.OK), + DOWN(HttpStatus.SERVICE_UNAVAILABLE), + OUT_OF_SERVICE(HttpStatus.SERVICE_UNAVAILABLE), + UNKNOWN(HttpStatus.SERVICE_UNAVAILABLE) } diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index 7e9efac7..753f73ef 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -51,7 +51,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) = healthDescription.get().run { - resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message)) + resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message)) } private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) = diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml index 5127e7ef..a0235e17 100644 --- a/hv-collector-main/src/main/resources/logback.xml +++ b/hv-collector-main/src/main/resources/logback.xml @@ -27,9 +27,9 @@ </appender> <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/> <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> <root level="INFO"> diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml index 3960e399..3b6c0e89 100644 --- a/hv-collector-test-utils/pom.xml +++ b/hv-collector-test-utils/pom.xml @@ -51,5 +51,10 @@ <version>${project.parent.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt new file mode 100644 index 00000000..54913744 --- /dev/null +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Either +import arrow.core.identity +import org.assertj.core.api.AbstractAssert +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.ObjectAssert + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +class EitherAssert<A, B>(actual: Either<A, B>) + : AbstractAssert<EitherAssert<A, B>, Either<A, B>>(actual, EitherAssert::class.java) { + + fun isLeft(): EitherAssert<A, B> { + isNotNull() + isInstanceOf(Either.Left::class.java) + return myself + } + + fun left(): ObjectAssert<A> { + isLeft() + val left = actual.fold( + ::identity, + { throw AssertionError("should be left") }) + return assertThat(left) + } + + fun isRight(): EitherAssert<A, B> { + isNotNull() + isInstanceOf(Either.Right::class.java) + return myself + } + + fun right(): ObjectAssert<B> { + isRight() + val right = actual.fold( + { throw AssertionError("should be right") }, + ::identity) + return assertThat(right) + } +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt index 081dd0da..d017b31b 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt @@ -17,15 +17,39 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.http +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Either +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.time.Duration /** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since August 2018 + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 */ -class Status { - companion object { - const val OK = 200 - const val SERVICE_UNAVAILABLE = 503 + +private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils") + +object Assertions : org.assertj.core.api.Assertions() { + fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual) +} + + +fun waitUntilSucceeds(action: () -> Unit) = waitUntilSucceeds(50, Duration.ofMillis(10), action) + +fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) { + var tryNum = 0 + while (tryNum <= retries) { + tryNum++ + try { + logger.debug("Try number $tryNum") + action() + break + } catch (ex: Throwable) { + if (tryNum >= retries) + throw ex + else + Thread.sleep(sleepTime.toMillis()) + } } } diff --git a/hv-collector-test-utils/src/main/kotlin/configurations.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt index 57843b45..57843b45 100644 --- a/hv-collector-test-utils/src/main/kotlin/configurations.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt diff --git a/hv-collector-test-utils/src/main/kotlin/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index c6aa89b2..c6aa89b2 100644 --- a/hv-collector-test-utils/src/main/kotlin/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt diff --git a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index 6aeb6206..6aeb6206 100644 --- a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt diff --git a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker index ca6ee9ce..ca6ee9ce 100644 --- a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker +++ b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index f1b7f061..81daf9b2 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -85,6 +85,20 @@ <artifactId>arrow-syntax</artifactId> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + </dependency> + <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -121,7 +135,10 @@ <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <scope>provided</scope> + </dependency> </dependencies> - - </project>
\ No newline at end of file diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 2d538b72..a99fef5e 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -35,6 +35,10 @@ import java.util.concurrent.atomic.AtomicReference fun <A> Either<A, A>.flatten() = fold(::identity, ::identity) +fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) + +fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity) + fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get()) fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> = diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt index cef537e8..05d13094 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt @@ -50,10 +50,14 @@ fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitC flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) -fun IO<Any>.void() = map { Unit } +fun IO<Any>.unit() = map { Unit } -fun <T> Mono<T>.asIo() = IO.async<T> { proc -> - subscribe({ proc(Right(it)) }, { proc(Left(it)) }) +fun <T> Mono<T>.asIo() = IO.async<T> { callback -> + subscribe({ + callback(Right(it)) + }, { + callback(Left(it)) + }) } fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt new file mode 100644 index 00000000..c5c46397 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import arrow.typeclasses.Show +import java.util.* +import javax.json.Json + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +object HttpConstants { + const val STATUS_OK = 200 + const val STATUS_ACCEPTED = 202 + const val STATUS_BAD_REQUEST = 400 + const val STATUS_NOT_FOUND = 404 + const val STATUS_INTERNAL_SERVER_ERROR = 500 + const val STATUS_SERVICE_UNAVAILABLE = 503 + + const val CONTENT_TYPE_JSON = "application/json" + const val CONTENT_TYPE_TEXT = "text/plain" +} + +enum class HttpStatus(val number: Int) { + OK(HttpConstants.STATUS_OK), + ACCEPTED(HttpConstants.STATUS_ACCEPTED), + BAD_REQUEST(HttpConstants.STATUS_BAD_REQUEST), + NOT_FOUND(HttpConstants.STATUS_NOT_FOUND), + INTERNAL_SERVER_ERROR(HttpConstants.STATUS_INTERNAL_SERVER_ERROR), + SERVICE_UNAVAILABLE(HttpConstants.STATUS_SERVICE_UNAVAILABLE) +} + + +enum class ContentType(val value: String) { + JSON(HttpConstants.CONTENT_TYPE_JSON), + TEXT(HttpConstants.CONTENT_TYPE_TEXT) +} + +data class Response(val status: HttpStatus, val content: Content<Any>) +data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any()) + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +object Responses { + + fun acceptedResponse(id: UUID): Response { + return Response( + HttpStatus.ACCEPTED, + Content(ContentType.TEXT, id) + ) + } + + fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response { + return Response(httpStatus, + Content(ContentType.JSON, + Json.createObjectBuilder() + .add("status", name) + .add("message", message) + .build())) + } +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt new file mode 100644 index 00000000..0282d0c7 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import javax.json.Json + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ + +private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack") + +fun ratpack.http.Response.sendOrError(action: IO<Unit>) { + sendAndHandleErrors(action.map { + Response( + HttpStatus.OK, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("response", "Request accepted").build())) + }) +} + +fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) { + when(response) { + is Either.Left -> send(errorResponse(response.a.toString())) + is Either.Right -> sendAndHandleErrors(IO.just(response.b)) + } +} + +fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) { + response.attempt().unsafeRunSync().fold( + { err -> + logger.warn("Error occurred. Sending .", err) + val message = err.message + send(errorResponse(message)) + }, + ::send + ) +} + +private fun errorResponse(message: String?): Response { + return Response( + HttpStatus.INTERNAL_SERVER_ERROR, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("error", message).build())) +} + +fun ratpack.http.Response.send(response: Response) { + val respWithStatus = status(response.status.number) + response.content.apply { + respWithStatus.send( + type.value, + serializer.run { value.show() }) + } +} diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt new file mode 100644 index 00000000..f9f716a1 --- /dev/null +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt @@ -0,0 +1,101 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import java.util.* +import javax.json.JsonObject + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class ResponsesTest : Spek({ + describe("response factory") { + describe("accepted response") { + given("uuid") { + val uuid = UUID.randomUUID() + + on("calling acceptedResponse") { + val result = Responses.acceptedResponse(uuid) + + it ("should have ACCEPTED status") { + assertThat(result.status).isEqualTo(HttpStatus.ACCEPTED) + } + + it ("should have text body") { + assertThat(result.content.type).isEqualTo(ContentType.TEXT) + } + + it ("should contain UUID text in the body") { + val serialized = result.content.serializer.run { result.content.value.show() } + assertThat(serialized).isEqualTo(uuid.toString()) + } + } + } + } + describe("status response") { + given("all params are specified") { + val status = "ok" + val message = "good job" + val httpStatus = HttpStatus.OK + + on("calling statusResponse") { + val result = Responses.statusResponse(status, message, httpStatus) + val json = result.content.value as JsonObject + + it ("should have OK status") { + assertThat(result.status).isEqualTo(HttpStatus.OK) + } + + it ("should have json body") { + assertThat(result.content.type).isEqualTo(ContentType.JSON) + } + + it ("should contain status as string") { + assertThat(json.getString("status")).isEqualTo(status) + } + + it ("should contain message") { + assertThat(json.getString("message")).isEqualTo(message) + } + } + } + + given("default params are omitted") { + val status = "ok" + val message = "good job" + + on("calling statusResponse") { + val result = Responses.statusResponse(status, message) + + it ("should have OK status") { + assertThat(result.status).isEqualTo(HttpStatus.OK) + } + } + } + } + } +}) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt index 060f28a2..754fa31f 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt @@ -19,11 +19,13 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api +import arrow.core.Either +import arrow.core.Option import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl import javax.json.JsonArray interface MessageParametersParser { - fun parse(request: JsonArray): List<MessageParameters> + fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>> companion object { val INSTANCE: MessageParametersParser by lazy { @@ -31,3 +33,5 @@ interface MessageParametersParser { } } } + +data class ParsingError(val message: String, val cause: Option<Throwable>) diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt index 5b328f1c..f3095618 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt @@ -19,9 +19,12 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.impl +import arrow.core.Option +import arrow.core.Try import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import javax.json.JsonArray /** @@ -32,8 +35,8 @@ internal class MessageParametersParserImpl( private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser() ) : MessageParametersParser { - override fun parse(request: JsonArray): List<MessageParameters> = - try { + override fun parse(request: JsonArray) = + Try { request .map { it.asJsonObject() } .map { @@ -41,13 +44,13 @@ internal class MessageParametersParserImpl( .parse(it.getJsonObject("commonEventHeader")) val messageType = MessageType.valueOf(it.getString("messageType")) val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue() - ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.", - NullPointerException()) + ?: throw NullPointerException("\"messagesAmount\" could not be parsed from message.") MessageParameters(commonEventHeader, messageType, messagesAmount) } - } catch (e: Exception) { - throw ParsingException("Parsing request body failed", e) + }.toEither().mapLeft { ex -> + ParsingError( + ex.message ?: "Unable to parse message parameters", + Option.fromNullable(ex)) } - internal class ParsingException(message: String, cause: Exception) : Exception(message, cause) } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt index 92561995..3b1a48b3 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl.ParsingException import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl @@ -45,18 +44,20 @@ object MessageParametersParserTest : Spek({ it("should parse MessagesParameters object successfully") { val result = messageParametersParser.parse(validMessagesParametesJson()) - assertThat(result).isNotNull - assertThat(result).hasSize(2) - val firstMessage = result.first() - assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID) - assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT) + result.fold({ fail("should have succeeded") }) { rightResult -> + assertThat(rightResult).hasSize(2) + val firstMessage = rightResult.first() + assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID) + assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT) + + } } } + on("invalid parameters json") { it("should throw exception") { - assertThatExceptionOfType(ParsingException::class.java).isThrownBy { - messageParametersParser.parse(invalidMessagesParametesJson()) - } + val result = messageParametersParser.parse(invalidMessagesParametesJson()) + assertThat(result.isLeft()).describedAs("is left").isTrue() } } } diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml index d44e2511..cfe1dc14 100644 --- a/hv-collector-xnf-simulator/pom.xml +++ b/hv-collector-xnf-simulator/pom.xml @@ -106,6 +106,10 @@ <artifactId>arrow-effects</artifactId> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt deleted file mode 100644 index 02e6ee72..00000000 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.scheduler.Schedulers -import javax.json.Json - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 - */ -internal class HttpServer(private val vesClient: XnfSimulator, - private val messageParametersParser: MessageParametersParser = INSTANCE) { - - fun start(port: Int): IO<RatpackServer> = IO { - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::configureHandlers) - } - } - - private fun configureHandlers(chain: Chain) { - chain - .post("simulator/sync") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendIo(it) } - .map { it.unsafeRunSync() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .post("simulator/async") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendRx(it) } - .map { it.subscribeOn(Schedulers.elastic()).subscribe() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun sendAcceptedResponse(ctx: Context) { - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - - private fun handleException(t: Throwable, ctx: Context) { - logger.warn("Failed to process the request - ${t.localizedMessage}") - logger.debug("Exception thrown when processing the request", t) - ctx.response - .status(STATUS_BAD_REQUEST) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request was not accepted") - .add("exception", t.localizedMessage) - .build() - .toString()) - } - - companion object { - private val logger = Logger(HttpServer::class) - const val STATUS_OK = 200 - const val STATUS_BAD_REQUEST = 400 - const val CONTENT_TYPE_APPLICATION_JSON = "application/json" - } -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index e8a474d0..558bd1c1 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -19,98 +19,40 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.core.Either +import arrow.core.Some +import arrow.core.Try +import arrow.core.fix +import arrow.core.flatMap +import arrow.core.monad import arrow.effects.IO -import io.netty.handler.ssl.ClientAuth -import io.netty.handler.ssl.SslContext -import io.netty.handler.ssl.SslContextBuilder -import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient - +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import java.io.InputStream +import javax.json.Json /** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 */ -internal class XnfSimulator(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security)) - } - .build() - - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> { - sendRx(messages).block() - } - - fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { - val complete = ReplayProcessor.create<Void>(1) - client - .newHandler { _, output -> handler(complete, messages, output) } - .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, - nettyOutbound: NettyOutbound): Publisher<Void> { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(MAX_BATCH_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) - .then { - logger.info("Messages have been sent") - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): SslContext = - SslContextBuilder.forClient() - .keyManager(config.cert.toFile(), config.privateKey.toFile()) - .trustManager(config.trustedCert.toFile()) - .sslProvider(SslProvider.OPENSSL) - .clientAuth(ClientAuth.REQUIRE) - .build() - - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } - - companion object { - private val logger = Logger(XnfSimulator::class) - private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE - } +class XnfSimulator( + private val vesClient: VesHvClient, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = + Either.monad<ParsingError>().binding { + val json = parseJsonArray(messageParameters).bind() + val parsed = messageParametersParser.parse(json).bind() + val generatedMessages = messageGenerator.createMessageFlux(parsed) + vesClient.sendIo(generatedMessages) + }.fix() + + private fun parseJsonArray(jsonStream: InputStream) = + Try { + Json.createReader(jsonStream).readArray() + }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt new file mode 100644 index 00000000..22e47d75 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpClient + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class VesHvClient(private val configuration: SimulatorConfiguration) { + + private val client: TcpClient = TcpClient.builder() + .options { opts -> + opts.host(configuration.vesHost) + .port(configuration.vesPort) + .sslContext(createSslContext(configuration.security)) + } + .build() + + fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + sendRx(messages).then(Mono.just(Unit)).asIo() + + private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + val complete = ReplayProcessor.create<Void>(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } + + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<PayloadWireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { + + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) + val frames = messages + .map(encoder::encode) + .window(MAX_BATCH_SIZE) + + return nettyOutbound + .logConnectionClosed() + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() + } + + private fun createSslContext(config: SecurityConfiguration): SslContext = + SslContextBuilder.forClient() + .keyManager(config.cert.toFile(), config.privateKey.toFile()) + .trustManager(config.trustedCert.toFile()) + .sslProvider(SslProvider.OPENSSL) + .clientAuth(ClientAuth.REQUIRE) + .build() + + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + + companion object { + private val logger = Logger(VesHvClient::class) + private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt new file mode 100644 index 00000000..54ead6f7 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.utils.http.Content +import org.onap.dcae.collectors.veshv.utils.http.ContentType +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Response +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.TypedData +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import java.util.* +import javax.json.Json + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +internal class XnfApiServer( + private val xnfSimulator: XnfSimulator, + private val ongoingSimulations: OngoingSimulations) { + + fun start(port: Int): IO<RatpackServer> = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) + } + } + + private fun configureHandlers(chain: Chain) { + chain + .post("simulator", ::startSimulationHandler) + .post("simulator/async", ::startSimulationHandler) + .get("simulator/:id", ::simulatorStatusHandler) + .get("healthcheck") { ctx -> + logger.info("Checking health") + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + private fun startSimulationHandler(ctx: Context) { + logger.info("Starting asynchronous scenario") + ctx.request.body.then { body -> + val id = startSimulation(body) + ctx.response.sendEitherErrorOrResponse(id) + } + } + + private fun startSimulation(body: TypedData): Either<ParsingError, Response> { + return xnfSimulator.startSimulation(body.inputStream) + .map(ongoingSimulations::startAsynchronousSimulation) + .map(Responses::acceptedResponse) + } + + private fun simulatorStatusHandler(ctx: Context) { + val id = UUID.fromString(ctx.pathTokens["id"]) + val status = ongoingSimulations.status(id) + val response = Responses.statusResponse(status.toString(), status.message) + ctx.response.sendAndHandleErrors(IO.just(response)) + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 999d0327..56d6212a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import arrow.core.ForOption import arrow.core.Option diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 708ffd13..9b6ef209 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration @@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -internal data class SimulatorConfiguration( +data class SimulatorConfiguration( val listenPort: Int, val vesHost: String, val vesPort: Int, diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt new file mode 100644 index 00000000..95bb4897 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import arrow.effects.IO +import kotlinx.coroutines.experimental.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { + private val asyncSimulationContext = executor.asCoroutineDispatcher() + private val simulations = ConcurrentHashMap<UUID, Status>() + + fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { + val id = UUID.randomUUID() + simulations[id] = StatusOngoing + + simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> + result.fold( + { err -> + logger.warn("Error", err) + simulations[id] = StatusFailure(err) + }, + { + logger.info("Finished sending messages") + simulations[id] = StatusSuccess + } + ) + } + return id + } + + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() { + simulations.clear() + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} + +sealed class Status(val message: String) { + override fun toString() = this::class.simpleName ?: "null" +} + +object StatusNotFound : Status("not found") +object StatusOngoing : Status("ongoing") +object StatusSuccess : Status("success") +data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}") diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index fa6d626b..c9e900ac 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.void +import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map {config -> - XnfSimulator(config) - .let { HttpServer(it) } + .map { config -> + XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations()) .start(config.listenPort) - .void() + .unit() } .unsafeRunEitherSync( { ex -> diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt new file mode 100644 index 00000000..70d8ba83 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.effects.IO +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess +import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import java.time.Duration +import java.util.* +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class OngoingSimulationsTest : Spek({ + val executor = Executors.newSingleThreadExecutor() + val cut = OngoingSimulations(executor) + + describe("simulations repository") { + given("not existing task task id") { + val id = UUID.randomUUID() + + on("status") { + val result = cut.status(id) + + it("should have 'not found' status") { + assertThat(result).isEqualTo(StatusNotFound) + } + } + } + + given("never ending task") { + val task = IO.async<Unit> { } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have ongoing status") { + assertThat(cut.status(result)).isEqualTo(StatusOngoing) + } + } + } + + given("failing task") { + val cause = RuntimeException("facepalm") + val task = IO.raiseError<Unit>(cause) + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have failing status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusFailure(cause)) + } + } + } + } + + given("successful task") { + val task = IO { println("great success!") } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have successful status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusSuccess) + } + } + } + } + + afterGroup { + executor.shutdown() + } + } + + afterEachTest { cut.clear() } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt new file mode 100644 index 00000000..80f39579 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Left +import arrow.core.None +import arrow.core.Right +import arrow.effects.IO +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class XnfSimulatorTest : Spek({ + lateinit var cut: XnfSimulator + lateinit var vesClient: VesHvClient + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + + beforeEachTest { + vesClient = mock() + messageParametersParser = mock() + messageGenerator = mock() + cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator) + } + + describe("startSimulation") { + it("should fail when empty input stream") { + // given + val emptyInputStream = ByteInputStream() + + // when + val result = cut.startSimulation(emptyInputStream) + + // then + assertThat(result).isLeft() + } + + it("should fail when invalid JSON") { + // given + val invalidJson = "invalid json".byteInputStream() + + // when + val result = cut.startSimulation(invalidJson) + + // then + assertThat(result).isLeft() + } + + it("should fail when JSON syntax is valid but content is invalid") { + // given + val json = "[1,2,3]".byteInputStream() + val cause = ParsingError("epic fail", None) + whenever(messageParametersParser.parse(any())).thenReturn( + Left(cause)) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).left().isEqualTo(cause) + } + + it("should return generated messages") { + // given + val json = "[true]".byteInputStream() + val messageParams = listOf<MessageParameters>() + val generatedMessages = Flux.empty<PayloadWireFrameMessage>() + val sendingIo = IO {} + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) + whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).right().isSameAs(sendingIo) + } + } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt index 8749dc5b..69caf727 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt @@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError @@ -65,7 +65,7 @@ <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version> <jacoco.version>0.8.2</jacoco.version> - <jacoco.minimum.coverage>60</jacoco.minimum.coverage> + <jacoco.minimum.coverage>66</jacoco.minimum.coverage> <!-- Protocol buffers --> <protobuf.version>3.5.1</protobuf.version> @@ -445,6 +445,7 @@ <https_proxy>${docker.http_proxy}</https_proxy> </args> --> + <dockerFileDir>${project.basedir}</dockerFileDir> <tags> <tag>${project.version}-${maven.build.timestamp}Z</tag> @@ -525,6 +526,11 @@ <version>${kotlin.version}</version> </dependency> <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + <version>0.25.0</version> + </dependency> + <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> <version>${arrow.version}</version> |