summaryrefslogtreecommitdiffstats
path: root/dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java
diff options
context:
space:
mode:
Diffstat (limited to 'dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java')
-rw-r--r--dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java257
1 files changed, 257 insertions, 0 deletions
diff --git a/dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java b/dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java
new file mode 100644
index 0000000..ffaf42b
--- /dev/null
+++ b/dcaedt_catalog/commons/src/main/java/org/onap/sdc/dcae/catalog/commons/Futures.java
@@ -0,0 +1,257 @@
+package org.onap.sdc.dcae.catalog.commons;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collections;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+
+import org.onap.sdc.common.onaplog.OnapLoggerDebug;
+import org.onap.sdc.common.onaplog.OnapLoggerError;
+import org.onap.sdc.dcae.catalog.commons.Future;
+import org.onap.sdc.dcae.catalog.commons.FutureHandler;
+import org.onap.sdc.common.onaplog.Enums.LogLevel;
+
+
+/**
+ */
+public class Futures<T> {
+
+ private Futures() {
+ }
+
+
+ public static <T> Future<T> failedFuture(Throwable theError) {
+ return new BasicFuture<T>()
+ .cause(theError);
+ }
+
+ public static <T> Future<T> succeededFuture(T theResult) {
+ return new BasicFuture<T>()
+ .result(theResult);
+ }
+
+ public static <T> Future<T> future() {
+ return new BasicFuture<T>();
+ }
+
+ public static <U,V> Future<V> advance(Future<U> theStep,
+ final Function<U,V> theResultFunction) {
+ return advance(theStep, theResultFunction, Function.identity());
+ }
+
+ public static <U,V> Future<V> advance(Future<U> theStep,
+ final Function<U,V> theResultFunction,
+ final Function<Throwable, Throwable> theErrorFunction) {
+ final Future<V> adv = new BasicFuture<V>();
+ theStep.setHandler(new FutureHandler<U>() {
+ public void handle(Future<U> theResult) {
+ if (theResult.failed())
+ adv.cause(theErrorFunction.apply(theResult.cause()));
+ else
+ adv.result(theResultFunction.apply(theResult.result()));
+ }
+ });
+ return adv;
+ }
+
+ /** */
+ public static class BasicFuture<T> implements Future<T> {
+
+ protected boolean succeeded,
+ failed;
+
+ protected FutureHandler<T> handler;
+ protected Throwable cause;
+ protected T result;
+
+
+ protected BasicFuture() {
+ }
+
+ public T result() {
+ return this.result;
+ }
+
+ public Future<T> result(T theResult) {
+ this.result = theResult;
+ this.succeeded = true;
+ this.cause = null;
+ this.failed = false;
+ callHandler();
+ return this;
+ }
+
+ public Throwable cause() {
+ return this.cause;
+ }
+
+ public Future<T> cause(Throwable theCause) {
+ this.cause = theCause;
+ this.failed = true;
+ this.result = null;
+ this.succeeded = false;
+ callHandler();
+ return this;
+ }
+
+ public boolean succeeded() {
+ return this.succeeded;
+ }
+
+ public boolean failed() {
+ return this.failed;
+ }
+
+ public boolean complete() {
+ return this.failed || this.succeeded;
+ }
+
+ public Future<T> setHandler(FutureHandler<T> theHandler) {
+ this.handler = theHandler;
+ callHandler();
+ return this;
+ }
+
+ public T waitForResult() throws Exception {
+ BasicHandler<T> hnd = buildHandler();
+ setHandler(hnd);
+ hnd.waitForCompletion();
+ if (failed())
+ throw (Exception)cause();
+ else
+ return result();
+ }
+
+ public Future<T> waitForCompletion() throws InterruptedException {
+ BasicHandler<T> hnd = buildHandler();
+ setHandler(hnd);
+ hnd.waitForCompletion();
+ return this;
+ }
+
+ protected void callHandler() {
+ if (this.handler != null && complete()) {
+ this.handler.handle(this);
+ }
+ }
+
+ protected BasicHandler<T> buildHandler() {
+ return new BasicHandler<T>();
+ }
+ }
+
+
+ /** */
+ public static class BasicHandler<T>
+ implements FutureHandler<T> {
+
+ protected T result = null;
+ protected Throwable error = null;
+ protected CountDownLatch latch = null;
+
+ BasicHandler() {
+ this(new CountDownLatch(1));
+ }
+
+ BasicHandler(CountDownLatch theLatch) {
+ this.latch = theLatch;
+ }
+
+ public void handle(Future<T> theResult) {
+ process(theResult);
+ if (this.latch != null) {
+ this.latch.countDown();
+ }
+ }
+
+ protected void process(Future<T> theResult) {
+ if (theResult.failed()) {
+ this.error = theResult.cause();
+ }
+ else {
+ this.result = theResult.result();
+ }
+ }
+
+ public T result(boolean doWait)
+ throws InterruptedException, RuntimeException {
+ if (doWait) {
+ waitForCompletion();
+ }
+ if (null == this.error)
+ return this.result;
+
+ throw new RuntimeException(this.error);
+ }
+
+ public T result()
+ throws InterruptedException, RuntimeException {
+ return result(true);
+ }
+
+ public BasicHandler<T> waitForCompletion() throws InterruptedException {
+ this.latch.await();
+ return this;
+ }
+ }
+
+ /** */
+ public static class Accumulator<T> extends BasicFuture<List<T>>
+ implements Future<List<T>> {
+
+ protected List<Future<T>> futures = new LinkedList<Future<T>>();
+ //protected List<T> results = new LinkedList<T>();
+ protected BasicHandler<T> handler = null;
+
+ private static OnapLoggerError errLogger = OnapLoggerError.getInstance();
+ private static OnapLoggerDebug debugLogger = OnapLoggerDebug.getInstance();
+
+ public Accumulator() {
+ this.result = new LinkedList<T>();
+ }
+
+ public Accumulator<T> add(Future<T> theFuture) {
+ debugLogger.log(LogLevel.DEBUG, this.getClass().getName(), "Intersection add");
+ this.futures.add(theFuture);
+ this.result.add(null);
+ return this;
+ }
+
+ public Accumulator<T> addAll(Accumulator<T> theFutures) {
+
+ debugLogger.log(LogLevel.DEBUG, this.getClass().getName(), "Intersection addAll");
+
+ return this;
+ }
+
+ public Future<List<T>> accumulate() {
+ this.futures = Collections.unmodifiableList(this.futures);
+ this.handler = new BasicHandler<T>(new CountDownLatch(this.futures.size())) {
+ protected void process(Future<T> theResult) {
+ if (theResult.failed()) {
+ Accumulator.this.cause = theResult.cause();
+ }
+ else {
+ Accumulator.this.result.set(
+ Accumulator.this.futures.indexOf(theResult), theResult.result());
+ }
+ if (this.latch.getCount() == 1) {
+ if (Accumulator.this.cause != null)
+ Accumulator.this.cause(Accumulator.this.cause);
+ else
+ Accumulator.this.result(Accumulator.this.result);
+ }
+ }
+ };
+ futures.stream()
+ .forEach(f -> f.setHandler(this.handler));
+
+ return this;
+ }
+
+ }
+
+
+}