summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml6
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt12
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt (renamed from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt)92
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt9
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt3
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt13
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt2
-rw-r--r--hv-collector-main/src/main/resources/logback.xml6
-rw-r--r--hv-collector-test-utils/pom.xml5
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt62
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt (renamed from hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt)38
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt (renamed from hv-collector-test-utils/src/main/kotlin/configurations.kt)0
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt (renamed from hv-collector-test-utils/src/main/kotlin/messages.kt)0
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt (renamed from hv-collector-test-utils/src/main/kotlin/vesEvents.kt)0
-rw-r--r--hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker (renamed from hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker)0
-rw-r--r--hv-collector-utils/pom.xml21
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt4
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt10
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt81
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt77
-rw-r--r--hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt101
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt17
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt21
-rw-r--r--hv-collector-xnf-simulator/pom.xml4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt102
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt122
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt115
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt95
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt)2
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt (renamed from hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt)4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt76
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt15
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt107
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt114
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt6
-rw-r--r--pom.xml8
38 files changed, 1037 insertions, 321 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 33aedeca..f9f52b4e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -28,7 +28,7 @@ services:
command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
ves-hv-collector:
- image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
# build:
# context: hv-collector-main
# dockerfile: Dockerfile
@@ -51,7 +51,7 @@ services:
- ./ssl/:/etc/ves-hv/
xnf-simulator:
- image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
# build:
# context: hv-collector-xnf-simulator
# dockerfile: Dockerfile
@@ -64,7 +64,7 @@ services:
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
- image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
+ image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
# build:
# context: hv-collector-dcae-app-simulator
# dockerfile: Dockerfile
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 1e22d4c0..ba29844a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -149,7 +149,7 @@ object PerformanceSpecification : Spek({
val outputDigest = digest.digest()
- assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
+ assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
assertThat(outputDigest).isEqualTo(inputDigest)
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 239f7102..354edaeb 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -54,10 +54,14 @@ class MessageStreamValidation(
val expectations = Json.createReader(input).readArray()
val messageParams = messageParametersParser.parse(expectations)
- if (messageParams.isEmpty())
- throw IllegalArgumentException("Message param list cannot be empty")
-
- return messageParams
+ return messageParams.fold(
+ { throw IllegalArgumentException("Parsing error: " + it.message) },
+ {
+ if (it.isEmpty())
+ throw IllegalArgumentException("Message param list cannot be empty")
+ it
+ }
+ )
}
private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index 6c830b9d..1eca9317 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -19,18 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-import arrow.core.Left
-import arrow.core.Right
import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.monad
-import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import ratpack.exec.Promise
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.http.Response
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
@@ -38,7 +34,21 @@ import ratpack.server.ServerConfig
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val simulator: DcaeAppSimulator) {
+class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
+ private val responseValid by lazy {
+ Responses.statusResponse(
+ name = "valid",
+ message = "validation succeeded"
+ )
+ }
+
+ private val responseInvalid by lazy {
+ Responses.statusResponse(
+ name = "invalid",
+ message = "validation failed",
+ httpStatus = HttpStatus.BAD_REQUEST
+ )
+ }
fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
@@ -52,10 +62,10 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
private fun setupHandlers(chain: Chain) {
chain
.put("configuration/topics") { ctx ->
- val operation = ctx.bodyIo().flatMap { body ->
- simulator.listenToTopics(body.text)
+ ctx.request.body.then { body ->
+ val operation = simulator.listenToTopics(body.text)
+ ctx.response.sendOrError(operation)
}
- ctx.response.sendOrError(operation)
}
.delete("messages") { ctx ->
@@ -64,7 +74,7 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
}
.get("messages/all/count") { ctx ->
simulator.state().fold(
- { ctx.response.status(STATUS_NOT_FOUND) },
+ { ctx.response.status(HttpConstants.STATUS_NOT_FOUND) },
{
ctx.response
.contentType(CONTENT_TEXT)
@@ -72,58 +82,20 @@ class ApiServer(private val simulator: DcaeAppSimulator) {
})
}
.post("messages/all/validate") { ctx ->
- val responseStatus = IO.monad().binding {
- val body = ctx.bodyIo().bind()
- val isValid = simulator.validate(body.inputStream).bind()
- if (isValid)
- STATUS_OK
- else
- STATUS_BAD_REQUEST
- }.fix()
-
- ctx.response.sendStatusOrError(responseStatus)
+ ctx.request.body.then { body ->
+ val response = simulator.validate(body.inputStream)
+ .map { isValid ->
+ if (isValid) responseValid else responseInvalid
+ }
+ ctx.response.sendAndHandleErrors(response)
+ }
}
.get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
+ ctx.response.status(HttpConstants.STATUS_OK).send()
}
}
- private fun Context.bodyIo() = request.body.asIo()
-
- private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
- onError {
- emitResult(Left(it))
- }.then { result ->
- emitResult(Right(result))
- }
- }
-
- private fun Response.sendOrError(responseStatus: IO<Unit>) {
- sendStatusOrError(responseStatus.map { STATUS_OK })
- }
-
- private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
- responseStatus.unsafeRunAsync { cb ->
- cb.fold(
- { err ->
- logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
- status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
- .send(CONTENT_TEXT, err.message)
- },
- {
- status(it).send()
- }
- )
- }
- }
-
companion object {
- private val logger = Logger(ApiServer::class)
private const val CONTENT_TEXT = "text/plain"
-
- private const val STATUS_OK = 200
- private const val STATUS_BAD_REQUEST = 400
- private const val STATUS_NOT_FOUND = 404
- private const val STATUS_INTERNAL_SERVER_ERROR = 500
}
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index a65a2686..c0f8b340 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -24,11 +24,10 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -52,7 +51,7 @@ fun main(args: Array<String>) =
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+ return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
.start(config.apiPort, config.kafkaTopics)
- .void()
+ .unit()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 0bdd1159..2932367b 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.core.Either
import arrow.core.Left
import arrow.core.None
+import arrow.core.Right
import arrow.core.Some
import arrow.effects.IO
import javax.json.stream.JsonParsingException
@@ -67,7 +68,7 @@ internal class MessageStreamValidationTest : Spek({
}
fun givenParsedMessageParameters(vararg params: MessageParameters) {
- whenever(messageParametersParser.parse(any())).thenReturn(params.toList())
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(params.toList()))
}
describe("validate") {
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
index 79fc9321..1adf0cad 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-enum class HealthStatus(val httpResponseStatus: Int) {
- UP(OK),
- DOWN(SERVICE_UNAVAILABLE),
- OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
- UNKNOWN(SERVICE_UNAVAILABLE)
+enum class HealthStatus(val httpResponseStatus: HttpStatus) {
+ UP(HttpStatus.OK),
+ DOWN(HttpStatus.SERVICE_UNAVAILABLE),
+ OUT_OF_SERVICE(HttpStatus.SERVICE_UNAVAILABLE),
+ UNKNOWN(HttpStatus.SERVICE_UNAVAILABLE)
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 7e9efac7..753f73ef 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -51,7 +51,7 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por
private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
healthDescription.get().run {
- resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+ resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
}
private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
index 5127e7ef..a0235e17 100644
--- a/hv-collector-main/src/main/resources/logback.xml
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -27,9 +27,9 @@
</appender>
<logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
<!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
<root level="INFO">
diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml
index 3960e399..3b6c0e89 100644
--- a/hv-collector-test-utils/pom.xml
+++ b/hv-collector-test-utils/pom.xml
@@ -51,5 +51,10 @@
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt
new file mode 100644
index 00000000..54913744
--- /dev/null
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/arrow.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import arrow.core.identity
+import org.assertj.core.api.AbstractAssert
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.ObjectAssert
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+class EitherAssert<A, B>(actual: Either<A, B>)
+ : AbstractAssert<EitherAssert<A, B>, Either<A, B>>(actual, EitherAssert::class.java) {
+
+ fun isLeft(): EitherAssert<A, B> {
+ isNotNull()
+ isInstanceOf(Either.Left::class.java)
+ return myself
+ }
+
+ fun left(): ObjectAssert<A> {
+ isLeft()
+ val left = actual.fold(
+ ::identity,
+ { throw AssertionError("should be left") })
+ return assertThat(left)
+ }
+
+ fun isRight(): EitherAssert<A, B> {
+ isNotNull()
+ isInstanceOf(Either.Right::class.java)
+ return myself
+ }
+
+ fun right(): ObjectAssert<B> {
+ isRight()
+ val right = actual.fold(
+ { throw AssertionError("should be right") },
+ ::identity)
+ return assertThat(right)
+ }
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
index 081dd0da..d017b31b 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
@@ -17,15 +17,39 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.utils.http
+package org.onap.dcae.collectors.veshv.tests.utils
+
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
*/
-class Status {
- companion object {
- const val OK = 200
- const val SERVICE_UNAVAILABLE = 503
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
+
+object Assertions : org.assertj.core.api.Assertions() {
+ fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+}
+
+
+fun waitUntilSucceeds(action: () -> Unit) = waitUntilSucceeds(50, Duration.ofMillis(10), action)
+
+fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
+ var tryNum = 0
+ while (tryNum <= retries) {
+ tryNum++
+ try {
+ logger.debug("Try number $tryNum")
+ action()
+ break
+ } catch (ex: Throwable) {
+ if (tryNum >= retries)
+ throw ex
+ else
+ Thread.sleep(sleepTime.toMillis())
+ }
}
}
diff --git a/hv-collector-test-utils/src/main/kotlin/configurations.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt
index 57843b45..57843b45 100644
--- a/hv-collector-test-utils/src/main/kotlin/configurations.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/configurations.kt
diff --git a/hv-collector-test-utils/src/main/kotlin/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index c6aa89b2..c6aa89b2 100644
--- a/hv-collector-test-utils/src/main/kotlin/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
diff --git a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index 6aeb6206..6aeb6206 100644
--- a/hv-collector-test-utils/src/main/kotlin/vesEvents.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
diff --git a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker
index ca6ee9ce..ca6ee9ce 100644
--- a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
+++ b/hv-collector-test-utils/src/main/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index f1b7f061..81daf9b2 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -85,6 +85,20 @@
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -121,7 +135,10 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
-
-
</project> \ No newline at end of file
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index 2d538b72..a99fef5e 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -35,6 +35,10 @@ import java.util.concurrent.atomic.AtomicReference
fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
+fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
+
+fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+
fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index cef537e8..05d13094 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -50,10 +50,14 @@ fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitC
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
-fun IO<Any>.void() = map { Unit }
+fun IO<Any>.unit() = map { Unit }
-fun <T> Mono<T>.asIo() = IO.async<T> { proc ->
- subscribe({ proc(Right(it)) }, { proc(Left(it)) })
+fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
+ subscribe({
+ callback(Right(it))
+ }, {
+ callback(Left(it))
+ })
}
fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
new file mode 100644
index 00000000..c5c46397
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/http.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.typeclasses.Show
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+object HttpConstants {
+ const val STATUS_OK = 200
+ const val STATUS_ACCEPTED = 202
+ const val STATUS_BAD_REQUEST = 400
+ const val STATUS_NOT_FOUND = 404
+ const val STATUS_INTERNAL_SERVER_ERROR = 500
+ const val STATUS_SERVICE_UNAVAILABLE = 503
+
+ const val CONTENT_TYPE_JSON = "application/json"
+ const val CONTENT_TYPE_TEXT = "text/plain"
+}
+
+enum class HttpStatus(val number: Int) {
+ OK(HttpConstants.STATUS_OK),
+ ACCEPTED(HttpConstants.STATUS_ACCEPTED),
+ BAD_REQUEST(HttpConstants.STATUS_BAD_REQUEST),
+ NOT_FOUND(HttpConstants.STATUS_NOT_FOUND),
+ INTERNAL_SERVER_ERROR(HttpConstants.STATUS_INTERNAL_SERVER_ERROR),
+ SERVICE_UNAVAILABLE(HttpConstants.STATUS_SERVICE_UNAVAILABLE)
+}
+
+
+enum class ContentType(val value: String) {
+ JSON(HttpConstants.CONTENT_TYPE_JSON),
+ TEXT(HttpConstants.CONTENT_TYPE_TEXT)
+}
+
+data class Response(val status: HttpStatus, val content: Content<Any>)
+data class Content<T>(val type: ContentType, val value: T, val serializer: Show<T> = Show.any())
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+object Responses {
+
+ fun acceptedResponse(id: UUID): Response {
+ return Response(
+ HttpStatus.ACCEPTED,
+ Content(ContentType.TEXT, id)
+ )
+ }
+
+ fun statusResponse(name: String, message: String, httpStatus: HttpStatus = HttpStatus.OK): Response {
+ return Response(httpStatus,
+ Content(ContentType.JSON,
+ Json.createObjectBuilder()
+ .add("status", name)
+ .add("message", message)
+ .build()))
+ }
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
new file mode 100644
index 00000000..0282d0c7
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import javax.json.Json
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
+
+fun ratpack.http.Response.sendOrError(action: IO<Unit>) {
+ sendAndHandleErrors(action.map {
+ Response(
+ HttpStatus.OK,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("response", "Request accepted").build()))
+ })
+}
+
+fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
+ when(response) {
+ is Either.Left -> send(errorResponse(response.a.toString()))
+ is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+ }
+}
+
+fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
+ response.attempt().unsafeRunSync().fold(
+ { err ->
+ logger.warn("Error occurred. Sending .", err)
+ val message = err.message
+ send(errorResponse(message))
+ },
+ ::send
+ )
+}
+
+private fun errorResponse(message: String?): Response {
+ return Response(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("error", message).build()))
+}
+
+fun ratpack.http.Response.send(response: Response) {
+ val respWithStatus = status(response.status.number)
+ response.content.apply {
+ respWithStatus.send(
+ type.value,
+ serializer.run { value.show() })
+ }
+}
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt
new file mode 100644
index 00000000..f9f716a1
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/http/ResponsesTest.kt
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.*
+import javax.json.JsonObject
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class ResponsesTest : Spek({
+ describe("response factory") {
+ describe("accepted response") {
+ given("uuid") {
+ val uuid = UUID.randomUUID()
+
+ on("calling acceptedResponse") {
+ val result = Responses.acceptedResponse(uuid)
+
+ it ("should have ACCEPTED status") {
+ assertThat(result.status).isEqualTo(HttpStatus.ACCEPTED)
+ }
+
+ it ("should have text body") {
+ assertThat(result.content.type).isEqualTo(ContentType.TEXT)
+ }
+
+ it ("should contain UUID text in the body") {
+ val serialized = result.content.serializer.run { result.content.value.show() }
+ assertThat(serialized).isEqualTo(uuid.toString())
+ }
+ }
+ }
+ }
+ describe("status response") {
+ given("all params are specified") {
+ val status = "ok"
+ val message = "good job"
+ val httpStatus = HttpStatus.OK
+
+ on("calling statusResponse") {
+ val result = Responses.statusResponse(status, message, httpStatus)
+ val json = result.content.value as JsonObject
+
+ it ("should have OK status") {
+ assertThat(result.status).isEqualTo(HttpStatus.OK)
+ }
+
+ it ("should have json body") {
+ assertThat(result.content.type).isEqualTo(ContentType.JSON)
+ }
+
+ it ("should contain status as string") {
+ assertThat(json.getString("status")).isEqualTo(status)
+ }
+
+ it ("should contain message") {
+ assertThat(json.getString("message")).isEqualTo(message)
+ }
+ }
+ }
+
+ given("default params are omitted") {
+ val status = "ok"
+ val message = "good job"
+
+ on("calling statusResponse") {
+ val result = Responses.statusResponse(status, message)
+
+ it ("should have OK status") {
+ assertThat(result.status).isEqualTo(HttpStatus.OK)
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
index 060f28a2..754fa31f 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParametersParser.kt
@@ -19,11 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
+import arrow.core.Either
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
import javax.json.JsonArray
interface MessageParametersParser {
- fun parse(request: JsonArray): List<MessageParameters>
+ fun parse(request: JsonArray): Either<ParsingError, List<MessageParameters>>
companion object {
val INSTANCE: MessageParametersParser by lazy {
@@ -31,3 +33,5 @@ interface MessageParametersParser {
}
}
}
+
+data class ParsingError(val message: String, val cause: Option<Throwable>)
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
index 5b328f1c..f3095618 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
@@ -19,9 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
+import arrow.core.Option
+import arrow.core.Try
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import javax.json.JsonArray
/**
@@ -32,8 +35,8 @@ internal class MessageParametersParserImpl(
private val commonEventHeaderParser: CommonEventHeaderParser = CommonEventHeaderParser()
) : MessageParametersParser {
- override fun parse(request: JsonArray): List<MessageParameters> =
- try {
+ override fun parse(request: JsonArray) =
+ Try {
request
.map { it.asJsonObject() }
.map {
@@ -41,13 +44,13 @@ internal class MessageParametersParserImpl(
.parse(it.getJsonObject("commonEventHeader"))
val messageType = MessageType.valueOf(it.getString("messageType"))
val messagesAmount = it.getJsonNumber("messagesAmount")?.longValue()
- ?: throw ParsingException("\"messagesAmount\" could not be parsed from message.",
- NullPointerException())
+ ?: throw NullPointerException("\"messagesAmount\" could not be parsed from message.")
MessageParameters(commonEventHeader, messageType, messagesAmount)
}
- } catch (e: Exception) {
- throw ParsingException("Parsing request body failed", e)
+ }.toEither().mapLeft { ex ->
+ ParsingError(
+ ex.message ?: "Unable to parse message parameters",
+ Option.fromNullable(ex))
}
- internal class ParsingException(message: String, cause: Exception) : Exception(message, cause)
}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
index 92561995..3b1a48b3 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
@@ -20,13 +20,12 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl.ParsingException
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
@@ -45,18 +44,20 @@ object MessageParametersParserTest : Spek({
it("should parse MessagesParameters object successfully") {
val result = messageParametersParser.parse(validMessagesParametesJson())
- assertThat(result).isNotNull
- assertThat(result).hasSize(2)
- val firstMessage = result.first()
- assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
- assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+ result.fold({ fail("should have succeeded") }) { rightResult ->
+ assertThat(rightResult).hasSize(2)
+ val firstMessage = rightResult.first()
+ assertThat(firstMessage.messageType).isEqualTo(MessageType.VALID)
+ assertThat(firstMessage.amount).isEqualTo(EXPECTED_MESSAGES_AMOUNT)
+
+ }
}
}
+
on("invalid parameters json") {
it("should throw exception") {
- assertThatExceptionOfType(ParsingException::class.java).isThrownBy {
- messageParametersParser.parse(invalidMessagesParametesJson())
- }
+ val result = messageParametersParser.parse(invalidMessagesParametesJson())
+ assertThat(result.isLeft()).describedAs("is left").isTrue()
}
}
}
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index d44e2511..cfe1dc14 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -106,6 +106,10 @@
<artifactId>arrow-effects</artifactId>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
deleted file mode 100644
index 02e6ee72..00000000
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.scheduler.Schedulers
-import javax.json.Json
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-internal class HttpServer(private val vesClient: XnfSimulator,
- private val messageParametersParser: MessageParametersParser = INSTANCE) {
-
- fun start(port: Int): IO<RatpackServer> = IO {
- RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(this::configureHandlers)
- }
- }
-
- private fun configureHandlers(chain: Chain) {
- chain
- .post("simulator/sync") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendIo(it) }
- .map { it.unsafeRunSync() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .post("simulator/async") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readArray() }
- .map { messageParametersParser.parse(it) }
- .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
- .map { vesClient.sendRx(it) }
- .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
- .onError { handleException(it, ctx) }
- .then { sendAcceptedResponse(ctx) }
- }
- .get("healthcheck") { ctx ->
- ctx.response.status(STATUS_OK).send()
- }
- }
-
- private fun sendAcceptedResponse(ctx: Context) {
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
-
- private fun handleException(t: Throwable, ctx: Context) {
- logger.warn("Failed to process the request - ${t.localizedMessage}")
- logger.debug("Exception thrown when processing the request", t)
- ctx.response
- .status(STATUS_BAD_REQUEST)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request was not accepted")
- .add("exception", t.localizedMessage)
- .build()
- .toString())
- }
-
- companion object {
- private val logger = Logger(HttpServer::class)
- const val STATUS_OK = 200
- const val STATUS_BAD_REQUEST = 400
- const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
- }
-}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index e8a474d0..558bd1c1 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -19,98 +19,40 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.core.Either
+import arrow.core.Some
+import arrow.core.Try
+import arrow.core.fix
+import arrow.core.flatMap
+import arrow.core.monad
import arrow.effects.IO
-import io.netty.handler.ssl.ClientAuth
-import io.netty.handler.ssl.SslContext
-import io.netty.handler.ssl.SslContextBuilder
-import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
-
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import java.io.InputStream
+import javax.json.Json
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
*/
-internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security))
- }
- .build()
-
- fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
- sendRx(messages).block()
- }
-
- fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .newHandler { _, output -> handler(complete, messages, output) }
- .doOnError {
- logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- .subscribe {
- logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<PayloadWireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(MAX_BATCH_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
- .then {
- logger.info("Messages have been sent")
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): SslContext =
- SslContextBuilder.forClient()
- .keyManager(config.cert.toFile(), config.privateKey.toFile())
- .trustManager(config.trustedCert.toFile())
- .sslProvider(SslProvider.OPENSSL)
- .clientAuth(ClientAuth.REQUIRE)
- .build()
-
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
-
- companion object {
- private val logger = Logger(XnfSimulator::class)
- private const val MAX_BATCH_SIZE = 128
- private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
- }
+class XnfSimulator(
+ private val vesClient: VesHvClient,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+ private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ Either.monad<ParsingError>().binding {
+ val json = parseJsonArray(messageParameters).bind()
+ val parsed = messageParametersParser.parse(json).bind()
+ val generatedMessages = messageGenerator.createMessageFlux(parsed)
+ vesClient.sendIo(generatedMessages)
+ }.fix()
+
+ private fun parseJsonArray(jsonStream: InputStream) =
+ Try {
+ Json.createReader(jsonStream).readArray()
+ }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
new file mode 100644
index 00000000..22e47d75
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpClient
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(private val configuration: SimulatorConfiguration) {
+
+ private val client: TcpClient = TcpClient.builder()
+ .options { opts ->
+ opts.host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .sslContext(createSslContext(configuration.security))
+ }
+ .build()
+
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+ sendRx(messages).then(Mono.just(Unit)).asIo()
+
+ private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .newHandler { _, output -> handler(complete, messages, output) }
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
+
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound): Publisher<Void> {
+
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
+ val frames = messages
+ .map(encoder::encode)
+ .window(MAX_BATCH_SIZE)
+
+ return nettyOutbound
+ .logConnectionClosed()
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
+ }
+
+ private fun createSslContext(config: SecurityConfiguration): SslContext =
+ SslContextBuilder.forClient()
+ .keyManager(config.cert.toFile(), config.privateKey.toFile())
+ .trustManager(config.trustedCert.toFile())
+ .sslProvider(SslProvider.OPENSSL)
+ .clientAuth(ClientAuth.REQUIRE)
+ .build()
+
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
+ companion object {
+ private val logger = Logger(VesHvClient::class)
+ private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
new file mode 100644
index 00000000..54ead6f7
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.http.Content
+import org.onap.dcae.collectors.veshv.utils.http.ContentType
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.TypedData
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import java.util.*
+import javax.json.Json
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class XnfApiServer(
+ private val xnfSimulator: XnfSimulator,
+ private val ongoingSimulations: OngoingSimulations) {
+
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
+ }
+ }
+
+ private fun configureHandlers(chain: Chain) {
+ chain
+ .post("simulator", ::startSimulationHandler)
+ .post("simulator/async", ::startSimulationHandler)
+ .get("simulator/:id", ::simulatorStatusHandler)
+ .get("healthcheck") { ctx ->
+ logger.info("Checking health")
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ private fun startSimulationHandler(ctx: Context) {
+ logger.info("Starting asynchronous scenario")
+ ctx.request.body.then { body ->
+ val id = startSimulation(body)
+ ctx.response.sendEitherErrorOrResponse(id)
+ }
+ }
+
+ private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
+ return xnfSimulator.startSimulation(body.inputStream)
+ .map(ongoingSimulations::startAsynchronousSimulation)
+ .map(Responses::acceptedResponse)
+ }
+
+ private fun simulatorStatusHandler(ctx: Context) {
+ val id = UUID.fromString(ctx.pathTokens["id"])
+ val status = ongoingSimulations.status(id)
+ val response = Responses.statusResponse(status.toString(), status.message)
+ ctx.response.sendAndHandleErrors(IO.just(response))
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 999d0327..56d6212a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import arrow.core.ForOption
import arrow.core.Option
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 708ffd13..9b6ef209 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.config
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-internal data class SimulatorConfiguration(
+data class SimulatorConfiguration(
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
new file mode 100644
index 00000000..95bb4897
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.effects.IO
+import kotlinx.coroutines.experimental.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+ private val asyncSimulationContext = executor.asCoroutineDispatcher()
+ private val simulations = ConcurrentHashMap<UUID, Status>()
+
+ fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ val id = UUID.randomUUID()
+ simulations[id] = StatusOngoing
+
+ simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
+ result.fold(
+ { err ->
+ logger.warn("Error", err)
+ simulations[id] = StatusFailure(err)
+ },
+ {
+ logger.info("Finished sending messages")
+ simulations[id] = StatusSuccess
+ }
+ )
+ }
+ return id
+ }
+
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() {
+ simulations.clear()
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
+
+sealed class Status(val message: String) {
+ override fun toString() = this::class.simpleName ?: "null"
+}
+
+object StatusNotFound : Status("not found")
+object StatusOngoing : Status("ongoing")
+object StatusSuccess : Status("success")
+data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index fa6d626b..c9e900ac 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
*/
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map {config ->
- XnfSimulator(config)
- .let { HttpServer(it) }
+ .map { config ->
+ XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
.start(config.listenPort)
- .void()
+ .unit()
}
.unsafeRunEitherSync(
{ ex ->
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
new file mode 100644
index 00000000..70d8ba83
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
+import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class OngoingSimulationsTest : Spek({
+ val executor = Executors.newSingleThreadExecutor()
+ val cut = OngoingSimulations(executor)
+
+ describe("simulations repository") {
+ given("not existing task task id") {
+ val id = UUID.randomUUID()
+
+ on("status") {
+ val result = cut.status(id)
+
+ it("should have 'not found' status") {
+ assertThat(result).isEqualTo(StatusNotFound)
+ }
+ }
+ }
+
+ given("never ending task") {
+ val task = IO.async<Unit> { }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have ongoing status") {
+ assertThat(cut.status(result)).isEqualTo(StatusOngoing)
+ }
+ }
+ }
+
+ given("failing task") {
+ val cause = RuntimeException("facepalm")
+ val task = IO.raiseError<Unit>(cause)
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have failing status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusFailure(cause))
+ }
+ }
+ }
+ }
+
+ given("successful task") {
+ val task = IO { println("great success!") }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have successful status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusSuccess)
+ }
+ }
+ }
+ }
+
+ afterGroup {
+ executor.shutdown()
+ }
+ }
+
+ afterEachTest { cut.clear() }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
new file mode 100644
index 00000000..80f39579
--- /dev/null
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.effects.IO
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class XnfSimulatorTest : Spek({
+ lateinit var cut: XnfSimulator
+ lateinit var vesClient: VesHvClient
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+
+ beforeEachTest {
+ vesClient = mock()
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ }
+
+ describe("startSimulation") {
+ it("should fail when empty input stream") {
+ // given
+ val emptyInputStream = ByteInputStream()
+
+ // when
+ val result = cut.startSimulation(emptyInputStream)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when invalid JSON") {
+ // given
+ val invalidJson = "invalid json".byteInputStream()
+
+ // when
+ val result = cut.startSimulation(invalidJson)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when JSON syntax is valid but content is invalid") {
+ // given
+ val json = "[1,2,3]".byteInputStream()
+ val cause = ParsingError("epic fail", None)
+ whenever(messageParametersParser.parse(any())).thenReturn(
+ Left(cause))
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).left().isEqualTo(cause)
+ }
+
+ it("should return generated messages") {
+ // given
+ val json = "[true]".byteInputStream()
+ val messageParams = listOf<MessageParameters>()
+ val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+ val sendingIo = IO {}
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
+ whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).right().isSameAs(sendingIo)
+ }
+ }
+})
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
index 8749dc5b..69caf727 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt
@@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
diff --git a/pom.xml b/pom.xml
index 03dac1f9..e1c90b3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
<jacoco.version>0.8.2</jacoco.version>
- <jacoco.minimum.coverage>60</jacoco.minimum.coverage>
+ <jacoco.minimum.coverage>66</jacoco.minimum.coverage>
<!-- Protocol buffers -->
<protobuf.version>3.5.1</protobuf.version>
@@ -445,6 +445,7 @@
<https_proxy>${docker.http_proxy}</https_proxy>
</args>
-->
+
<dockerFileDir>${project.basedir}</dockerFileDir>
<tags>
<tag>${project.version}-${maven.build.timestamp}Z</tag>
@@ -525,6 +526,11 @@
<version>${kotlin.version}</version>
</dependency>
<dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <version>0.25.0</version>
+ </dependency>
+ <dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
<version>${arrow.version}</version>