aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-02-28 12:14:17 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-02-28 12:34:32 +0100
commit33a2978ce2114d860d73e41084b8e917c4233118 (patch)
tree4759d707ac97094d8975c1d176086090971dcd9e /rest-services/cbs-client/src
parent5e6c996472969e50b3da60a06559c3231218637e (diff)
Emit CBS config updates only
Add an update method to CbsClient which emits an item only when the configuration has accentually changed. Change-Id: I6023fb1cc069b06bd2c4baf94406538965b6534c Issue-ID: DCAEGEN2-1233 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src')
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java42
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java2
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java22
7 files changed, 70 insertions, 4 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java
index b9a6e40d..3ee12eed 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java
@@ -19,15 +19,16 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.time.Duration;
import java.util.UUID;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import org.jetbrains.annotations.NotNull;
/**
* <p>Main Config Binding Service client interface.</p>
@@ -39,7 +40,9 @@ import org.jetbrains.annotations.NotNull;
public interface CbsClient {
/**
- * Get reactive configuration stream.
+ * <p>
+ * Get current application configuration.
+ *
* <p>
* Returns a {@link Mono} that publishes new configuration after CBS client retrieves one.
*
@@ -50,11 +53,14 @@ public interface CbsClient {
@NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext);
/**
+ * <p>
* Poll for configuration.
*
+ * <p>
* Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be
* changed, ie. items in the stream might be the same until change is made in CBS.
*
+ * @param diagnosticContext diagnostic context as defined in Logging Guideline
* @param initialDelay delay after first request attempt
* @param period frequency of update checks
* @return stream of configuration states
@@ -64,4 +70,34 @@ public interface CbsClient {
.map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID()))
.flatMap(this::get);
}
+
+ /**
+ * <p>
+ * Poll for configuration updates.
+ *
+ * <p>
+ * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item
+ * only when an update was detected, ie. when new item is different then last emitted item.
+ *
+ * <p>
+ * For more tailored change detection approach you can:
+ * <ul>
+ * <li>
+ * Use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig}
+ * (<b>experimental API</b>) if you want to react differently to changes in subsets of the configuration.
+ * </li>
+ * <li>
+ * Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with
+ * {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic.
+ * </li>
+ * </ul>
+ *
+ * @param diagnosticContext diagnostic context as defined in Logging Guideline
+ * @param initialDelay delay after first request attempt
+ * @param period frequency of update checks
+ * @return stream of configuration updates
+ */
+ default Flux<JsonObject> updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) {
+ return get(diagnosticContext, initialDelay, period).distinctUntilChanged();
+ }
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
index 7b47b127..0bece14f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
@@ -22,11 +22,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
import io.vavr.Function1;
import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
+@ExperimentalApi
@FunctionalInterface
public interface HashAlgorithm extends Function1<byte[], byte[]> {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
index b27c718e..46c032ea 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
@@ -33,6 +34,7 @@ import reactor.core.publisher.Flux;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
+@ExperimentalApi
public class ListenableCbsConfig {
private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
index 7f24b36e..837a1ca7 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
@@ -30,6 +30,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* An immutable <a href="https://en.wikipedia.org/wiki/Merkle_tree" target="_blank">Merkle Tree</a> implementation.
@@ -40,6 +41,7 @@ import org.jetbrains.annotations.NotNull;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
+@ExperimentalApi
public final class MerkleTree<V> {
private static final String DEFAULT_DIGEST_ALGORITHM = "SHA-256";
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
index 0368c8ad..b6130981 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.util.function.Consumer;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* The listener for changes of the {@link MerkleTree} subtree.
@@ -30,6 +31,7 @@ import java.util.function.Consumer;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
+@ExperimentalApi
@FunctionalInterface
public interface TreeChangeListener<V> extends Consumer<Option<MerkleTree<V>>> {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
index 9ef9fe63..b0824024 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
@@ -21,11 +21,13 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
import io.vavr.Function1;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.2
*/
+@ExperimentalApi
@FunctionalInterface
public interface ValueSerializer<V> extends Function1<V, byte[]> {
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index 309bb62f..8a0977d9 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -98,7 +98,8 @@ class CbsClientImplIT {
final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
// when
- final Flux<JsonObject> result = sut.flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
+ final Flux<JsonObject> result = sut
+ .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
// then
final int itemsToTake = 5;
@@ -108,6 +109,25 @@ class CbsClientImplIT {
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWithUpdatesCall() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final Duration period = Duration.ofMillis(10);
+
+ // when
+ final Flux<JsonObject> result = sut
+ .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
+
+ // then
+ final Duration timeToCollectItemsFor = period.multipliedBy(50);
+ StepVerifier.create(result.take(timeToCollectItemsFor).map(this::sampleConfigValue))
+ .expectNext(EXPECTED_CONFIG_VALUE)
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}