diff options
Diffstat (limited to 'sources/hv-collector-utils')
-rw-r--r-- | sources/hv-collector-utils/pom.xml | 12 | ||||
-rw-r--r-- | sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt | 12 | ||||
-rw-r--r-- | sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt | 3 | ||||
-rw-r--r-- | sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt (renamed from sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt) | 45 | ||||
-rw-r--r-- | sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt | 33 | ||||
-rw-r--r-- | sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt | 9 |
6 files changed, 48 insertions, 66 deletions
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index e85b8ee4..5053cf00 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -65,26 +65,22 @@ <artifactId>kotlin-reflect</artifactId> </dependency> <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-instances-data</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> + <artifactId>arrow-typeclasses</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects-instances</artifactId> + <artifactId>arrow-core-extensions</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> </dependency> <dependency> - <groupId>org.jetbrains.kotlinx</groupId> - <artifactId>kotlinx-coroutines-core</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <optional>true</optional> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index 47b3d559..cfed7f32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -23,16 +23,11 @@ import arrow.core.Either import arrow.core.ForOption import arrow.core.Option import arrow.core.Try +import arrow.core.extensions.option.monad.monad import arrow.core.fix import arrow.core.identity -import arrow.effects.ForIO -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monad.monad -import arrow.instances.option.monad.monad import arrow.syntax.collections.firstOption import arrow.typeclasses.MonadContinuation -import arrow.typeclasses.binding import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.util.concurrent.atomic.AtomicReference @@ -47,11 +42,6 @@ object OptionUtils { : Option<A> = Option.monad().binding(c).fix() } -object IOUtils { - fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A) - : IO<A> = IO.monad().binding(c).fix() -} - fun <A> Either<A, A>.flatten() = fold(::identity, ::identity) fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt index 9023528e..ac39100d 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt @@ -28,7 +28,8 @@ sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<Str object Entry : Marker(ENTRY) object Exit : Marker(EXIT) - class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) { + class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : + Marker(INVOKE, mdc(id, timestamp)) { companion object { private fun mdc(id: UUID, timestamp: Instant) = mapOf( OnapMdc.INVOCATION_ID to id.toString(), diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt index 56825221..58859462 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt @@ -17,16 +17,8 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.arrow +package org.onap.dcae.collectors.veshv.utils.process -import arrow.core.Either -import arrow.core.Left -import arrow.core.Right -import arrow.effects.IO -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.toMono import kotlin.system.exitProcess /** @@ -37,9 +29,8 @@ import kotlin.system.exitProcess sealed class ExitCode { abstract val code: Int - fun io() = IO { - exitProcess(code) - } + fun doExit(): Nothing = exitProcess(code) + } object ExitSuccess : ExitCode() { @@ -47,33 +38,3 @@ object ExitSuccess : ExitCode() { } data class ExitFailure(override val code: Int) : ExitCode() - -inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) = - flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() }) - -fun IO<Any>.unit() = map { Unit } - -fun <T> Mono<T>.asIo() = IO.async<T> { callback -> - subscribe({ - callback(Right(it)) - }, { - callback(Left(it)) - }) -} - -fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = - toMono().then(Mono.fromCallable(callback)) - -fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> = - flatMap { io -> - io.attempt().unsafeRunSync().fold( - { Flux.error<T>(it) }, - { Flux.just<T>(it) } - ) - } - -inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> = - map { - block(it) - it - } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt new file mode 100644 index 00000000..ceccbcba --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +package org.onap.dcae.collectors.veshv.utils.rx + +import org.reactivestreams.Publisher +import reactor.core.publisher.Mono +import reactor.core.publisher.toMono + +fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = + toMono().then(Mono.fromCallable(callback)) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 670ab4ac..728d62bb 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.netty.DisposableServer @@ -29,7 +28,7 @@ import reactor.netty.DisposableServer * @since August 2018 */ abstract class ServerHandle(val host: String, val port: Int) : Closeable { - abstract fun await(): IO<Unit> + abstract fun await(): Mono<Void> } /** @@ -58,8 +57,10 @@ class NettyServerHandle(private val ctx: DisposableServer, } } - override fun await() = IO<Unit> { - ctx.channel().closeFuture().sync() + override fun await(): Mono<Void> = Mono.create { callback -> + ctx.channel().closeFuture().addListener { + callback.success() + } } companion object { |