aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-utils/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-utils/src/main')
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt32
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt25
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt15
3 files changed, 69 insertions, 3 deletions
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index bedc2fcd..d5b33b91 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -20,10 +20,20 @@
package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
+import arrow.core.ForOption
import arrow.core.Option
import arrow.core.Try
+import arrow.core.fix
import arrow.core.identity
+import arrow.effects.ForIO
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.instances.option.monad.monad
import arrow.syntax.collections.firstOption
+import arrow.typeclasses.MonadContinuation
+import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicReference
/**
@@ -31,12 +41,24 @@ import java.util.concurrent.atomic.AtomicReference
* @since July 2018
*/
+object OptionUtils {
+ fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A)
+ : Option<A> = Option.monad().binding(c).fix()
+}
+
+object IOUtils {
+ fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A)
+ : IO<A> = IO.monad().binding(c).fix()
+}
+
fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
+
fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
@@ -57,3 +79,13 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply {
action(exception)
}
}
+
+fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B)
+ : Option<B> = let { OptionUtils.binding { c(it) } }
+
+
+
+
+
+
+
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
new file mode 100644
index 00000000..aaa598d2
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
@@ -0,0 +1,25 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then() \ No newline at end of file
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
index 87aea41e..cc940907 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
@@ -19,10 +19,19 @@
*/
package org.onap.dcae.collectors.veshv.utils
+import java.util.concurrent.atomic.AtomicReference
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
-fun registerShutdownHook(job: () -> Unit) =
- Runtime.getRuntime()
- .addShutdownHook(Thread({ job() }, "GracefulShutdownThread"))
+
+private val currentShutdownHook = AtomicReference<Thread>()
+
+fun registerShutdownHook(job: () -> Unit) {
+ val runtime = Runtime.getRuntime()
+ val newShutdownHook = Thread({ job() }, "GracefulShutdownThread")
+ currentShutdownHook.get()?.run(runtime::removeShutdownHook)
+ currentShutdownHook.set(newShutdownHook)
+ runtime.addShutdownHook(newShutdownHook)
+}