summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest-services/cbs-client/pom.xml19
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java39
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java107
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java355
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java54
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java32
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java138
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java97
8 files changed, 836 insertions, 5 deletions
diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml
index 2580762d..a3424b63 100644
--- a/rest-services/cbs-client/pom.xml
+++ b/rest-services/cbs-client/pom.xml
@@ -37,6 +37,11 @@
<artifactId>gson</artifactId>
</dependency>
<dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr
+ </artifactId>
+ </dependency>
+ <dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
@@ -62,11 +67,15 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
new file mode 100644
index 00000000..7b47b127
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.Function1;
+import io.vavr.collection.List;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+@FunctionalInterface
+public interface HashAlgorithm extends Function1<byte[], byte[]> {
+
+ @Override
+ default byte[] apply(byte[] bytes) {
+ return apply(List.of(bytes));
+ }
+
+ byte[] apply(Iterable<byte[]> bytes);
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
new file mode 100644
index 00000000..b27c718e
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+public class ListenableCbsConfig {
+
+ private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes);
+ private final Map<List<String>, CompositeTreeChangeListener<String>> pathListeners = new HashMap<>();
+ private final Object listenersUpdateMonitor = new Object();
+ private final Object treeUpdateMonitor = new Object();
+
+ public void listen(List<String> path, TreeChangeListener<String> listener) {
+ synchronized (listenersUpdateMonitor) {
+ CompositeTreeChangeListener<String> compositeListener = pathListeners
+ .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>());
+ compositeListener.addListener(listener);
+ }
+ }
+
+ public void cancel(List<String> path, TreeChangeListener<String> listener) {
+ synchronized (listenersUpdateMonitor) {
+ CompositeTreeChangeListener<String> compositeListener = pathListeners.get(path);
+ if (compositeListener != null) {
+ compositeListener.removeListener(listener);
+ }
+ }
+ }
+
+ public Flux<Option<MerkleTree<String>>> subtreeChanges(List<String> path) {
+ return Flux.create(sink -> {
+ final TreeChangeListener<String> listener = sink::next;
+ sink.onDispose(() -> cancel(path, listener));
+ listen(path, listener);
+ });
+ }
+
+ public Disposable subscribeForUpdates(Flux<MerkleTree<String>> updates) {
+ return updates.subscribe(this::update);
+ }
+
+ public void update(MerkleTree<String> newTree) {
+ final MerkleTree<String> oldTree;
+ synchronized (treeUpdateMonitor) {
+ oldTree = tree;
+ tree = newTree;
+ }
+
+ for (Map.Entry<List<String>, CompositeTreeChangeListener<String>> entry : pathListeners.entrySet()) {
+ final List<String> path = entry.getKey();
+ final CompositeTreeChangeListener<String> listeners = entry.getValue();
+ if (!newTree.isSame(path, oldTree)) {
+ listeners.accept(newTree, path);
+ }
+ }
+ }
+
+ private static class CompositeTreeChangeListener<V> implements TreeChangeListener<V> {
+
+ private final Collection<TreeChangeListener<V>> listeners = new HashSet<>();
+
+ void addListener(TreeChangeListener<V> listener) {
+ listeners.add(listener);
+ }
+
+ void removeListener(TreeChangeListener<V> listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void accept(Option<MerkleTree<V>> updatedSubtree) {
+ for (TreeChangeListener<V> listener : listeners) {
+ listener.accept(updatedSubtree);
+ }
+ }
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
new file mode 100644
index 00000000..7f24b36e
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java
@@ -0,0 +1,355 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.Function1;
+import io.vavr.collection.HashMap;
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Objects;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * An immutable <a href="https://en.wikipedia.org/wiki/Merkle_tree" target="_blank">Merkle Tree</a> implementation.
+ *
+ * Each node is labelled with a {@code String} label. A path of a node is defined as a list of labels from root to a
+ * given node.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+public final class MerkleTree<V> {
+
+ private static final String DEFAULT_DIGEST_ALGORITHM = "SHA-256";
+ private final ValueSerializer<V> valueSerializer;
+ private final HashAlgorithm hashAlgorithm;
+ private final MTNode<V> root;
+ private final Function1<V, byte[]> hashForValue;
+
+ private MerkleTree(
+ @NotNull ValueSerializer<V> valueSerializer,
+ @NotNull HashAlgorithm hashAlgorithm,
+ @NotNull MTNode<V> root) {
+ this.valueSerializer = Objects.requireNonNull(valueSerializer);
+ this.hashAlgorithm = Objects.requireNonNull(hashAlgorithm);
+ this.root = Objects.requireNonNull(root);
+ hashForValue = valueSerializer.andThen(hashAlgorithm);
+ }
+
+ /**
+ * Create an empty tree with given serializer and using default digest algorithm as a hash function.
+ *
+ * @param serializer a way of serializing a value to array of bytes
+ * @param <V> type of values kept in a tree
+ * @return empty tree
+ */
+ public static @NotNull <V> MerkleTree<V> emptyWithDefaultDigest(@NotNull ValueSerializer<V> serializer) {
+ return emptyWithDigest(DEFAULT_DIGEST_ALGORITHM, serializer);
+ }
+
+ /**
+ * Create an empty tree with given serializer and given digest algorithm used as a hash function.
+ *
+ * @param digestAlgorithmName name of a digest algorithm as used by {@link MessageDigest#getInstance(String)}
+ * @param serializer a way of serializing a value to array of bytes
+ * @param <V> type of values kept in a tree
+ * @return empty tree
+ */
+ public static @NotNull <V> MerkleTree<V> emptyWithDigest(
+ @NotNull String digestAlgorithmName,
+ @NotNull ValueSerializer<V> serializer) {
+ return emptyWithHashProvider(serializer, messages -> {
+ final MessageDigest messageDigest = messageDigest(digestAlgorithmName);
+ messages.forEach(messageDigest::update);
+ return messageDigest.digest();
+ });
+ }
+
+ /**
+ * Create an empty tree with given hash function.
+ *
+ * @param serializer a function which serializes values to a byte array
+ * @param hashAlgorithm a function which calculates a hash of a serialized value
+ * @param <V> type of values kept in a tree
+ * @return empty tree
+ */
+ public static <V> MerkleTree<V> emptyWithHashProvider(ValueSerializer<V> serializer, HashAlgorithm hashAlgorithm) {
+ return new MerkleTree<>(serializer, hashAlgorithm, MTNode.empty(hashAlgorithm));
+ }
+
+ private static MessageDigest messageDigest(String digestAlgorithmName) {
+ try {
+ return MessageDigest.getInstance(digestAlgorithmName);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException("Unsupported hash algorithm " + digestAlgorithmName, e);
+ }
+ }
+
+ /**
+ * Assigns a value to a given path.
+ *
+ * Overrides current value if already exists.
+ *
+ * @param value a value to assign
+ * @param path path of labels from root
+ * @return an updated tree instance or <code>this</code> if hashes are the same
+ */
+ public MerkleTree<V> add(V value, String... path) {
+ return add(List.of(path), value);
+ }
+
+ /**
+ * Assigns a value to a given path.
+ *
+ * Overrides current value if already exists.
+ *
+ * @param path path of labels from root
+ * @param value a value to assign
+ * @return an updated tree instance or <code>this</code> if hashes are the same
+ */
+ public MerkleTree<V> add(List<String> path, V value) {
+ final MTNode<V> result = root.addChild(path, MTNode.leaf(hashAlgorithm, hashForValue.apply(value), value));
+ return Arrays.equals(result.hash(), root.hash())
+ ? this
+ : new MerkleTree<>(valueSerializer, hashAlgorithm, result);
+ }
+
+
+ /**
+ * Gets a value assigned to a given path.
+ *
+ * @param path to search for
+ * @return Some(value) if path exists and contains a value, None otherwise
+ */
+ public Option<V> get(String... path) {
+ return get(List.of(path));
+ }
+
+ /**
+ * Gets a value assigned to a given path.
+ *
+ * @param path to search for
+ * @return Some(value) if path exists and contains a value, None otherwise
+ */
+ public Option<V> get(List<String> path) {
+ return root.findNode(path).flatMap(MTNode::value);
+ }
+
+ /**
+ * Checks if nodes under given path are the same in {@code this} and {@code other} tree.
+ *
+ * @param other a tree to compare with
+ * @param path a path to a subtree to compare
+ * @return true if hashes are the same, false otherwise
+ */
+ public boolean isSame(MerkleTree<V> other, String... path) {
+ return isSame(List.of(path), other);
+ }
+
+ /**
+ * Checks if nodes under given path are the same in {@code this} and {@code other} tree.
+ *
+ * @param other a tree to compare with
+ * @param path a path to a subtree to compare
+ * @return true if hashes are the same, false otherwise
+ */
+ public boolean isSame(List<String> path, MerkleTree<V> other) {
+ final byte[] oldHash = other.hashOf(path);
+ final byte[] curHash = hashOf(path);
+ return Arrays.equals(oldHash, curHash);
+ }
+
+ /**
+ * Returns a hash of a node under given path.
+ *
+ * @param path a path of a node to check
+ * @return a hash or empty array if node does not exist
+ */
+ public byte[] hashOf(List<String> path) {
+ return root
+ .findNode(path)
+ .map(node -> node.hash().clone())
+ .getOrElse(() -> new byte[0]);
+ }
+
+ /**
+ * Returns a hash of a node under given path.
+ *
+ * @param path a path of a node to check
+ * @return a hash or empty array if node does not exist
+ */
+ public byte[] hashOf(String... path) {
+ return hashOf(List.of(path));
+ }
+
+ /**
+ * Returns a subtree with given node as a root.
+ *
+ * @param path a path of a node to be a subtree root
+ * @return Some(subtree) if path exists, None otherwise
+ */
+ public Option<MerkleTree<V>> subtree(List<String> path) {
+ return root.findNode(path).map(node -> new MerkleTree<>(valueSerializer, hashAlgorithm, node));
+ }
+
+ /**
+ * Returns a subtree with given node as a root.
+ *
+ * @param path a path of a node to be a subtree root
+ * @return Some(subtree) if path exists, None otherwise
+ */
+ public Option<MerkleTree<V>> subtree(String... path) {
+ return subtree(List.of(path));
+ }
+
+ /**
+ * Hash of a root node.
+ *
+ * @return a copy of root node's hash
+ */
+ public byte[] hash() {
+ return root.hash().clone();
+ }
+
+ @Override
+ public String toString() {
+ return root.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MerkleTree<?> that = (MerkleTree<?>) o;
+ return Objects.equals(root, that.root);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(root);
+ }
+}
+
+final class MTNode<V> {
+
+ private final byte[] hash;
+ private final Option<V> value;
+ private final Map<String, MTNode<V>> children;
+ private final HashAlgorithm hashAlgorithm;
+
+ static <V> MTNode<V> empty(HashAlgorithm hashAlgorithm) {
+ return new MTNode<>(hashAlgorithm, new byte[0], Option.none(), HashMap.empty());
+ }
+
+ static <V> MTNode<V> leaf(HashAlgorithm hashAlgorithm, byte[] hash, V value) {
+ return new MTNode<>(hashAlgorithm, hash, Option.of(value), HashMap.empty());
+ }
+
+ private MTNode(
+ HashAlgorithm hashAlgorithm,
+ byte[] hash,
+ Option<V> value,
+ Map<String, MTNode<V>> children) {
+ this.hashAlgorithm = hashAlgorithm;
+ this.hash = hash.clone();
+ this.value = value;
+ this.children = children;
+ }
+
+ MTNode<V> addChild(final List<String> path, final MTNode<V> child) {
+ if (path.isEmpty()) {
+ return child;
+ } else {
+ String label = path.head();
+ MTNode<V> newChild = children.get(label).fold(
+ () -> MTNode.<V>empty(hashAlgorithm).addChild(path.tail(), child),
+ node -> node.addChild(path.tail(), child)
+ );
+ return addChild(label, newChild);
+ }
+ }
+
+ Option<V> value() {
+ return value;
+ }
+
+ Option<MTNode<V>> findNode(List<String> path) {
+ return path.headOption().fold(
+ () -> Option.of(this),
+ head -> children.get(head).flatMap(child -> child.findNode(path.tail()))
+ );
+ }
+
+ byte[] hash() {
+ return hash;
+ }
+
+ private MTNode<V> addChild(String label, MTNode<V> child) {
+ final Map<String, MTNode<V>> newChildren = children.put(label, child);
+ byte[] newHash = composeHashes(newChildren.iterator(this::hashForChild));
+ return Arrays.equals(newHash, hash) ? this : new MTNode<>(
+ hashAlgorithm,
+ newHash,
+ value,
+ newChildren
+ );
+ }
+
+ private byte[] hashForChild(String label, MTNode<V> child) {
+ return composeHashes(List.of(label.getBytes(), child.hash()));
+ }
+
+ private byte[] composeHashes(Iterable<byte[]> hashes) {
+ return hashAlgorithm.apply(hashes);
+ }
+
+ @Override
+ public String toString() {
+ return "(\"" + value.map(Object::toString).getOrElse("") + "\" ["
+ + children.map(entry -> entry._1 + "=" + entry._2).mkString(", ")
+ + "])";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MTNode<?> mtNode = (MTNode<?>) o;
+ return Arrays.equals(hash, mtNode.hash);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(hash);
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
new file mode 100644
index 00000000..0368c8ad
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import java.util.function.Consumer;
+
+/**
+ * The listener for changes of the {@link MerkleTree} subtree.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+@FunctionalInterface
+public interface TreeChangeListener<V> extends Consumer<Option<MerkleTree<V>>> {
+
+ /**
+ * Will be called when a change in a subtree has been detected. Default implementation will extract the changed subtree
+ * from the root and call {@link #accept(Option)}.
+ *
+ * @param updatedTree new, updated root tree
+ * @param path a changed path
+ */
+ default void accept(MerkleTree<V> updatedTree, List<String> path) {
+ accept(updatedTree.subtree(path));
+ }
+
+ /**
+ * Will be called when a change in a subtree has been detected.
+ *
+ * @param updatedSubtree new, updated subtree or None when branch has been removed
+ */
+ @Override
+ void accept(Option<MerkleTree<V>> updatedSubtree);
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
new file mode 100644
index 00000000..9ef9fe63
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.Function1;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.2
+ */
+@FunctionalInterface
+public interface ValueSerializer<V> extends Function1<V, byte[]> {
+
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java
new file mode 100644
index 00000000..3e77251d
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java
@@ -0,0 +1,138 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.vavr.collection.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.ReplayProcessor;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since February 2019
+ */
+class ListenableCbsConfigTest {
+
+ @Test
+ void listen_shouldCallListenerAfterEachChange() {
+ ListenableCbsConfig cut = new ListenableCbsConfig();
+
+ final List<String> expectedChanges = List.of("1", "2", "3");
+ final AtomicReference<List<String>> actualChanges = new AtomicReference<>(List.empty());
+
+ cut.listen(List.of("some-key"), subtreeOption ->
+ actualChanges.updateAndGet(
+ changes -> changes.append(subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")))
+
+ );
+
+ final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
+ .add(List.of("some-key"), "1");
+
+ final MerkleTree<String> updatedConfig1 = initialConfig
+ .add(List.of("some-key"), "2");
+
+ final MerkleTree<String> updatedConfig2 = updatedConfig1
+ .add(List.of("some-key"), "3");
+
+ cut.update(initialConfig);
+ cut.update(updatedConfig1);
+ cut.update(updatedConfig2);
+
+ assertThat(actualChanges.get()).isEqualTo(expectedChanges);
+
+ }
+
+
+ @Test
+ void subtreeChanges_shouldEmitItemOnEachChange() {
+ ListenableCbsConfig cut = new ListenableCbsConfig();
+
+ final ReplayProcessor<String> replayProcessor = ReplayProcessor.create();
+ final List<String> expectedChanges = List.of("1", "2", "3");
+
+ cut.subtreeChanges(List.of("some-key"))
+ .map(subtreeOption ->
+ subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]")
+ )
+ .subscribe(replayProcessor);
+
+ final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
+ .add(List.of("some-key"), "1");
+
+ final MerkleTree<String> updatedConfig1 = initialConfig
+ .add(List.of("some-key"), "2");
+
+ final MerkleTree<String> updatedConfig2 = updatedConfig1
+ .add(List.of("some-key"), "3");
+
+ cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2));
+
+ StepVerifier.create(replayProcessor.take(expectedChanges.size()))
+ .expectNextSequence(expectedChanges)
+ .verifyComplete();
+
+ }
+
+ @Test
+ void subtreeChanges_shouldEmitItemOnEachActualChangeAndWhenNodeHasBeenRemoved() {
+ ListenableCbsConfig cut = new ListenableCbsConfig();
+
+ final ReplayProcessor<String> actualChanges = ReplayProcessor.create();
+ final List<String> expectedChanges = List.of("http://dmaap/topic1", "http://dmaap/topic1-updated", "[None]");
+
+ cut.subtreeChanges(List.of("streams", "publishes"))
+ .map(subtreeOption ->
+ subtreeOption.flatMap(subtree -> subtree.get("topic1", "dmaap-url")).getOrElse("[None]")
+ )
+ .subscribe(actualChanges);
+
+ final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes)
+ .add(List.of("collector", "treshold"), "145")
+ .add(List.of("collector", "listenPort"), "8080");
+
+ final MerkleTree<String> updatedConfig1 = initialConfig
+ .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
+ .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1");
+
+ final MerkleTree<String> updatedConfig2 = updatedConfig1
+ .add(List.of("streams", "publishes", "topic1", "type"), "message-bus")
+ .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1-updated");
+
+ final MerkleTree<String> updatedConfig3 = updatedConfig2
+ .add(List.of("collector", "treshold"), "1410");
+
+ final MerkleTree<String> updatedConfig4 = initialConfig;
+
+ cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2, updatedConfig3, updatedConfig4));
+
+ StepVerifier.create(actualChanges.take(expectedChanges.size()))
+ .expectNextSequence(expectedChanges)
+ .verifyComplete();
+
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java
new file mode 100644
index 00000000..2a9a7fcd
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java
@@ -0,0 +1,97 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since February 2019
+ */
+class MerkleTreeTest {
+
+ @Test
+ void shouldBeAbleToGetEntries() {
+ MerkleTree<String> cut = emptyTree()
+ .add("value1", "ala", "ma", "kota")
+ .add("value2", "ala", "ma", "psa");
+
+ assertThat(cut.get("ala", "ma", "kota")).contains("value1");
+ assertThat(cut.get("ala", "ma", "psa")).contains("value2");
+ }
+
+ @Test
+ void shouldReturnNoneForNonExistingPaths() {
+ MerkleTree<String> cut = emptyTree()
+ .add("value1", "ala", "ma", "kota")
+ .add("value2", "ala", "ma", "psa");
+
+ assertThat(cut.get("ala", "je", "obiad")).isEmpty();
+ }
+
+ @Test
+ void shouldReturnNoneWhenNodeDoesntContainValue() {
+ MerkleTree<String> cut = emptyTree()
+ .add("value1", "ala", "ma", "kota")
+ .add("value2", "ala", "ma", "psa");
+
+ assertThat(cut.get("ala", "ma")).isEmpty();
+ }
+
+
+ @Test
+ void shouldNotCreateNewObjectWhenNothingChanged() {
+ MerkleTree<String> cut = emptyTree()
+ .add("some value", "ala", "ma", "kota");
+
+ final MerkleTree<String> result = cut.add("some value", "ala", "ma", "kota");
+
+ assertThat(result).isSameAs(cut);
+ }
+
+ @Test
+ void shouldRecalculateHashesAfterAddingNewNode() {
+ MerkleTree<String> cut = emptyTree()
+ .add("value1", "ala", "ma", "kota")
+ .add("value2", "ala", "ma", "psa")
+ .add("value3", "ala", "name");
+
+ final MerkleTree<String> modified = cut.add("value4", "ala", "surname");
+
+ assertThat(modified).isNotSameAs(cut);
+
+ assertThat(modified.hashOf("ala", "ma")).isEqualTo(cut.hashOf("ala", "ma"));
+ assertThat(modified.hashOf("ala", "ma", "kota")).isEqualTo(cut.hashOf("ala", "ma", "kota"));
+ assertThat(modified.hashOf("ala", "ma", "psa")).isEqualTo(cut.hashOf("ala", "ma", "psa"));
+ assertThat(modified.hashOf("ala", "name")).isEqualTo(cut.hashOf("ala", "name"));
+
+ assertThat(modified.hashOf("ala", "surname")).isNotEqualTo(cut.hashOf("ala", "surname"));
+ assertThat(modified.hashOf("ala")).isNotEqualTo(cut.hashOf("ala"));
+ assertThat(modified.hash()).isNotEqualTo(cut.hash());
+ }
+
+ private MerkleTree<String> emptyTree() {
+ return MerkleTree.emptyWithDefaultDigest(String::getBytes);
+ }
+} \ No newline at end of file