summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-utils')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt55
1 files changed, 29 insertions, 26 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
index cf338a70..f133d630 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
@@ -20,50 +20,53 @@
package org.onap.dcae.collectors.veshv.utils.http
import arrow.core.Either
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
import reactor.netty.NettyOutbound
import reactor.netty.http.server.HttpServerResponse
import javax.json.Json
private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty")
-fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound =
- sendAndHandleErrors(action.map {
- Response(
- HttpStatus.OK,
- Content(
- ContentType.JSON,
- Json.createObjectBuilder().add("response", "Request accepted").build()
+fun HttpServerResponse.sendOrError(action: ()->Unit) = sendAndHandleErrors(
+ Mono
+ .fromSupplier(action)
+ .map {
+ Response(
+ HttpStatus.OK,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("response", "Request accepted").build()
+ )
)
- )
- })
-
+ }
+)
-fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound =
- response.attempt().unsafeRunSync().fold(
- { err ->
- logger.withWarn { log("Error occurred. Sending .", err) }
- val message = err.message
- sendResponse(errorResponse(message))
- },
- {
- sendResponse(it)
+fun HttpServerResponse.sendAndHandleErrors(response: Mono<Response>) =
+ response
+ .onErrorResume {
+ logger.withWarn { log("Error occurred. Sending .", it) }
+ errorResponse(it.localizedMessage).toMono()
+ }
+ .flatMap {
+ sendResponse(it).then()
}
- )
-fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>): NettyOutbound =
+fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>) =
when (response) {
- is Either.Left -> sendResponse(errorResponse(response.a.toString()))
- is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+ is Either.Left -> sendResponse(errorResponse(response.a.toString())).then()
+ is Either.Right -> sendAndHandleErrors(Mono.just(response.b))
}
-private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
+
+fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
val respWithStatus = status(response.status.number)
val responseContent = response.content
- return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() }))
+ return respWithStatus.sendString(
+ Mono.just(responseContent.serializer.run { responseContent.value.show() })
+ )
}
private fun errorResponse(message: String?): Response =