diff options
8 files changed, 836 insertions, 5 deletions
diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml index 2580762d..a3424b63 100644 --- a/rest-services/cbs-client/pom.xml +++ b/rest-services/cbs-client/pom.xml @@ -37,6 +37,11 @@ <artifactId>gson</artifactId> </dependency> <dependency> + <groupId>io.vavr</groupId> + <artifactId>vavr + </artifactId> + </dependency> + <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> @@ -62,11 +67,15 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <scope>test</scope> - </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java new file mode 100644 index 00000000..7b47b127 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/HashAlgorithm.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.Function1; +import io.vavr.collection.List; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.2 + */ +@FunctionalInterface +public interface HashAlgorithm extends Function1<byte[], byte[]> { + + @Override + default byte[] apply(byte[] bytes) { + return apply(List.of(bytes)); + } + + byte[] apply(Iterable<byte[]> bytes); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java new file mode 100644 index 00000000..b27c718e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.collection.List; +import io.vavr.control.Option; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.2 + */ +public class ListenableCbsConfig { + + private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes); + private final Map<List<String>, CompositeTreeChangeListener<String>> pathListeners = new HashMap<>(); + private final Object listenersUpdateMonitor = new Object(); + private final Object treeUpdateMonitor = new Object(); + + public void listen(List<String> path, TreeChangeListener<String> listener) { + synchronized (listenersUpdateMonitor) { + CompositeTreeChangeListener<String> compositeListener = pathListeners + .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>()); + compositeListener.addListener(listener); + } + } + + public void cancel(List<String> path, TreeChangeListener<String> listener) { + synchronized (listenersUpdateMonitor) { + CompositeTreeChangeListener<String> compositeListener = pathListeners.get(path); + if (compositeListener != null) { + compositeListener.removeListener(listener); + } + } + } + + public Flux<Option<MerkleTree<String>>> subtreeChanges(List<String> path) { + return Flux.create(sink -> { + final TreeChangeListener<String> listener = sink::next; + sink.onDispose(() -> cancel(path, listener)); + listen(path, listener); + }); + } + + public Disposable subscribeForUpdates(Flux<MerkleTree<String>> updates) { + return updates.subscribe(this::update); + } + + public void update(MerkleTree<String> newTree) { + final MerkleTree<String> oldTree; + synchronized (treeUpdateMonitor) { + oldTree = tree; + tree = newTree; + } + + for (Map.Entry<List<String>, CompositeTreeChangeListener<String>> entry : pathListeners.entrySet()) { + final List<String> path = entry.getKey(); + final CompositeTreeChangeListener<String> listeners = entry.getValue(); + if (!newTree.isSame(path, oldTree)) { + listeners.accept(newTree, path); + } + } + } + + private static class CompositeTreeChangeListener<V> implements TreeChangeListener<V> { + + private final Collection<TreeChangeListener<V>> listeners = new HashSet<>(); + + void addListener(TreeChangeListener<V> listener) { + listeners.add(listener); + } + + void removeListener(TreeChangeListener<V> listener) { + listeners.remove(listener); + } + + @Override + public void accept(Option<MerkleTree<V>> updatedSubtree) { + for (TreeChangeListener<V> listener : listeners) { + listener.accept(updatedSubtree); + } + } + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java new file mode 100644 index 00000000..7f24b36e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTree.java @@ -0,0 +1,355 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.Function1; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Objects; +import org.jetbrains.annotations.NotNull; + +/** + * An immutable <a href="https://en.wikipedia.org/wiki/Merkle_tree" target="_blank">Merkle Tree</a> implementation. + * + * Each node is labelled with a {@code String} label. A path of a node is defined as a list of labels from root to a + * given node. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.2 + */ +public final class MerkleTree<V> { + + private static final String DEFAULT_DIGEST_ALGORITHM = "SHA-256"; + private final ValueSerializer<V> valueSerializer; + private final HashAlgorithm hashAlgorithm; + private final MTNode<V> root; + private final Function1<V, byte[]> hashForValue; + + private MerkleTree( + @NotNull ValueSerializer<V> valueSerializer, + @NotNull HashAlgorithm hashAlgorithm, + @NotNull MTNode<V> root) { + this.valueSerializer = Objects.requireNonNull(valueSerializer); + this.hashAlgorithm = Objects.requireNonNull(hashAlgorithm); + this.root = Objects.requireNonNull(root); + hashForValue = valueSerializer.andThen(hashAlgorithm); + } + + /** + * Create an empty tree with given serializer and using default digest algorithm as a hash function. + * + * @param serializer a way of serializing a value to array of bytes + * @param <V> type of values kept in a tree + * @return empty tree + */ + public static @NotNull <V> MerkleTree<V> emptyWithDefaultDigest(@NotNull ValueSerializer<V> serializer) { + return emptyWithDigest(DEFAULT_DIGEST_ALGORITHM, serializer); + } + + /** + * Create an empty tree with given serializer and given digest algorithm used as a hash function. + * + * @param digestAlgorithmName name of a digest algorithm as used by {@link MessageDigest#getInstance(String)} + * @param serializer a way of serializing a value to array of bytes + * @param <V> type of values kept in a tree + * @return empty tree + */ + public static @NotNull <V> MerkleTree<V> emptyWithDigest( + @NotNull String digestAlgorithmName, + @NotNull ValueSerializer<V> serializer) { + return emptyWithHashProvider(serializer, messages -> { + final MessageDigest messageDigest = messageDigest(digestAlgorithmName); + messages.forEach(messageDigest::update); + return messageDigest.digest(); + }); + } + + /** + * Create an empty tree with given hash function. + * + * @param serializer a function which serializes values to a byte array + * @param hashAlgorithm a function which calculates a hash of a serialized value + * @param <V> type of values kept in a tree + * @return empty tree + */ + public static <V> MerkleTree<V> emptyWithHashProvider(ValueSerializer<V> serializer, HashAlgorithm hashAlgorithm) { + return new MerkleTree<>(serializer, hashAlgorithm, MTNode.empty(hashAlgorithm)); + } + + private static MessageDigest messageDigest(String digestAlgorithmName) { + try { + return MessageDigest.getInstance(digestAlgorithmName); + } catch (NoSuchAlgorithmException e) { + throw new IllegalArgumentException("Unsupported hash algorithm " + digestAlgorithmName, e); + } + } + + /** + * Assigns a value to a given path. + * + * Overrides current value if already exists. + * + * @param value a value to assign + * @param path path of labels from root + * @return an updated tree instance or <code>this</code> if hashes are the same + */ + public MerkleTree<V> add(V value, String... path) { + return add(List.of(path), value); + } + + /** + * Assigns a value to a given path. + * + * Overrides current value if already exists. + * + * @param path path of labels from root + * @param value a value to assign + * @return an updated tree instance or <code>this</code> if hashes are the same + */ + public MerkleTree<V> add(List<String> path, V value) { + final MTNode<V> result = root.addChild(path, MTNode.leaf(hashAlgorithm, hashForValue.apply(value), value)); + return Arrays.equals(result.hash(), root.hash()) + ? this + : new MerkleTree<>(valueSerializer, hashAlgorithm, result); + } + + + /** + * Gets a value assigned to a given path. + * + * @param path to search for + * @return Some(value) if path exists and contains a value, None otherwise + */ + public Option<V> get(String... path) { + return get(List.of(path)); + } + + /** + * Gets a value assigned to a given path. + * + * @param path to search for + * @return Some(value) if path exists and contains a value, None otherwise + */ + public Option<V> get(List<String> path) { + return root.findNode(path).flatMap(MTNode::value); + } + + /** + * Checks if nodes under given path are the same in {@code this} and {@code other} tree. + * + * @param other a tree to compare with + * @param path a path to a subtree to compare + * @return true if hashes are the same, false otherwise + */ + public boolean isSame(MerkleTree<V> other, String... path) { + return isSame(List.of(path), other); + } + + /** + * Checks if nodes under given path are the same in {@code this} and {@code other} tree. + * + * @param other a tree to compare with + * @param path a path to a subtree to compare + * @return true if hashes are the same, false otherwise + */ + public boolean isSame(List<String> path, MerkleTree<V> other) { + final byte[] oldHash = other.hashOf(path); + final byte[] curHash = hashOf(path); + return Arrays.equals(oldHash, curHash); + } + + /** + * Returns a hash of a node under given path. + * + * @param path a path of a node to check + * @return a hash or empty array if node does not exist + */ + public byte[] hashOf(List<String> path) { + return root + .findNode(path) + .map(node -> node.hash().clone()) + .getOrElse(() -> new byte[0]); + } + + /** + * Returns a hash of a node under given path. + * + * @param path a path of a node to check + * @return a hash or empty array if node does not exist + */ + public byte[] hashOf(String... path) { + return hashOf(List.of(path)); + } + + /** + * Returns a subtree with given node as a root. + * + * @param path a path of a node to be a subtree root + * @return Some(subtree) if path exists, None otherwise + */ + public Option<MerkleTree<V>> subtree(List<String> path) { + return root.findNode(path).map(node -> new MerkleTree<>(valueSerializer, hashAlgorithm, node)); + } + + /** + * Returns a subtree with given node as a root. + * + * @param path a path of a node to be a subtree root + * @return Some(subtree) if path exists, None otherwise + */ + public Option<MerkleTree<V>> subtree(String... path) { + return subtree(List.of(path)); + } + + /** + * Hash of a root node. + * + * @return a copy of root node's hash + */ + public byte[] hash() { + return root.hash().clone(); + } + + @Override + public String toString() { + return root.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MerkleTree<?> that = (MerkleTree<?>) o; + return Objects.equals(root, that.root); + } + + @Override + public int hashCode() { + return Objects.hash(root); + } +} + +final class MTNode<V> { + + private final byte[] hash; + private final Option<V> value; + private final Map<String, MTNode<V>> children; + private final HashAlgorithm hashAlgorithm; + + static <V> MTNode<V> empty(HashAlgorithm hashAlgorithm) { + return new MTNode<>(hashAlgorithm, new byte[0], Option.none(), HashMap.empty()); + } + + static <V> MTNode<V> leaf(HashAlgorithm hashAlgorithm, byte[] hash, V value) { + return new MTNode<>(hashAlgorithm, hash, Option.of(value), HashMap.empty()); + } + + private MTNode( + HashAlgorithm hashAlgorithm, + byte[] hash, + Option<V> value, + Map<String, MTNode<V>> children) { + this.hashAlgorithm = hashAlgorithm; + this.hash = hash.clone(); + this.value = value; + this.children = children; + } + + MTNode<V> addChild(final List<String> path, final MTNode<V> child) { + if (path.isEmpty()) { + return child; + } else { + String label = path.head(); + MTNode<V> newChild = children.get(label).fold( + () -> MTNode.<V>empty(hashAlgorithm).addChild(path.tail(), child), + node -> node.addChild(path.tail(), child) + ); + return addChild(label, newChild); + } + } + + Option<V> value() { + return value; + } + + Option<MTNode<V>> findNode(List<String> path) { + return path.headOption().fold( + () -> Option.of(this), + head -> children.get(head).flatMap(child -> child.findNode(path.tail())) + ); + } + + byte[] hash() { + return hash; + } + + private MTNode<V> addChild(String label, MTNode<V> child) { + final Map<String, MTNode<V>> newChildren = children.put(label, child); + byte[] newHash = composeHashes(newChildren.iterator(this::hashForChild)); + return Arrays.equals(newHash, hash) ? this : new MTNode<>( + hashAlgorithm, + newHash, + value, + newChildren + ); + } + + private byte[] hashForChild(String label, MTNode<V> child) { + return composeHashes(List.of(label.getBytes(), child.hash())); + } + + private byte[] composeHashes(Iterable<byte[]> hashes) { + return hashAlgorithm.apply(hashes); + } + + @Override + public String toString() { + return "(\"" + value.map(Object::toString).getOrElse("") + "\" [" + + children.map(entry -> entry._1 + "=" + entry._2).mkString(", ") + + "])"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MTNode<?> mtNode = (MTNode<?>) o; + return Arrays.equals(hash, mtNode.hash); + } + + @Override + public int hashCode() { + return Arrays.hashCode(hash); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java new file mode 100644 index 00000000..0368c8ad --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreeChangeListener.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.collection.List; +import io.vavr.control.Option; +import java.util.function.Consumer; + +/** + * The listener for changes of the {@link MerkleTree} subtree. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.2 + */ +@FunctionalInterface +public interface TreeChangeListener<V> extends Consumer<Option<MerkleTree<V>>> { + + /** + * Will be called when a change in a subtree has been detected. Default implementation will extract the changed subtree + * from the root and call {@link #accept(Option)}. + * + * @param updatedTree new, updated root tree + * @param path a changed path + */ + default void accept(MerkleTree<V> updatedTree, List<String> path) { + accept(updatedTree.subtree(path)); + } + + /** + * Will be called when a change in a subtree has been detected. + * + * @param updatedSubtree new, updated subtree or None when branch has been removed + */ + @Override + void accept(Option<MerkleTree<V>> updatedSubtree); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java new file mode 100644 index 00000000..9ef9fe63 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ValueSerializer.java @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.Function1; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.2 + */ +@FunctionalInterface +public interface ValueSerializer<V> extends Function1<V, byte[]> { + +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java new file mode 100644 index 00000000..3e77251d --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java @@ -0,0 +1,138 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.vavr.collection.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; +import reactor.core.publisher.Flux; +import reactor.core.publisher.ReplayProcessor; +import reactor.test.StepVerifier; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since February 2019 + */ +class ListenableCbsConfigTest { + + @Test + void listen_shouldCallListenerAfterEachChange() { + ListenableCbsConfig cut = new ListenableCbsConfig(); + + final List<String> expectedChanges = List.of("1", "2", "3"); + final AtomicReference<List<String>> actualChanges = new AtomicReference<>(List.empty()); + + cut.listen(List.of("some-key"), subtreeOption -> + actualChanges.updateAndGet( + changes -> changes.append(subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]"))) + + ); + + final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes) + .add(List.of("some-key"), "1"); + + final MerkleTree<String> updatedConfig1 = initialConfig + .add(List.of("some-key"), "2"); + + final MerkleTree<String> updatedConfig2 = updatedConfig1 + .add(List.of("some-key"), "3"); + + cut.update(initialConfig); + cut.update(updatedConfig1); + cut.update(updatedConfig2); + + assertThat(actualChanges.get()).isEqualTo(expectedChanges); + + } + + + @Test + void subtreeChanges_shouldEmitItemOnEachChange() { + ListenableCbsConfig cut = new ListenableCbsConfig(); + + final ReplayProcessor<String> replayProcessor = ReplayProcessor.create(); + final List<String> expectedChanges = List.of("1", "2", "3"); + + cut.subtreeChanges(List.of("some-key")) + .map(subtreeOption -> + subtreeOption.flatMap(subtree -> subtree.get()).getOrElse("[None]") + ) + .subscribe(replayProcessor); + + final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes) + .add(List.of("some-key"), "1"); + + final MerkleTree<String> updatedConfig1 = initialConfig + .add(List.of("some-key"), "2"); + + final MerkleTree<String> updatedConfig2 = updatedConfig1 + .add(List.of("some-key"), "3"); + + cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2)); + + StepVerifier.create(replayProcessor.take(expectedChanges.size())) + .expectNextSequence(expectedChanges) + .verifyComplete(); + + } + + @Test + void subtreeChanges_shouldEmitItemOnEachActualChangeAndWhenNodeHasBeenRemoved() { + ListenableCbsConfig cut = new ListenableCbsConfig(); + + final ReplayProcessor<String> actualChanges = ReplayProcessor.create(); + final List<String> expectedChanges = List.of("http://dmaap/topic1", "http://dmaap/topic1-updated", "[None]"); + + cut.subtreeChanges(List.of("streams", "publishes")) + .map(subtreeOption -> + subtreeOption.flatMap(subtree -> subtree.get("topic1", "dmaap-url")).getOrElse("[None]") + ) + .subscribe(actualChanges); + + final MerkleTree<String> initialConfig = MerkleTree.<String>emptyWithDefaultDigest(String::getBytes) + .add(List.of("collector", "treshold"), "145") + .add(List.of("collector", "listenPort"), "8080"); + + final MerkleTree<String> updatedConfig1 = initialConfig + .add(List.of("streams", "publishes", "topic1", "type"), "message-bus") + .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1"); + + final MerkleTree<String> updatedConfig2 = updatedConfig1 + .add(List.of("streams", "publishes", "topic1", "type"), "message-bus") + .add(List.of("streams", "publishes", "topic1", "dmaap-url"), "http://dmaap/topic1-updated"); + + final MerkleTree<String> updatedConfig3 = updatedConfig2 + .add(List.of("collector", "treshold"), "1410"); + + final MerkleTree<String> updatedConfig4 = initialConfig; + + cut.subscribeForUpdates(Flux.just(initialConfig, updatedConfig1, updatedConfig2, updatedConfig3, updatedConfig4)); + + StepVerifier.create(actualChanges.take(expectedChanges.size())) + .expectNextSequence(expectedChanges) + .verifyComplete(); + + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java new file mode 100644 index 00000000..2a9a7fcd --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since February 2019 + */ +class MerkleTreeTest { + + @Test + void shouldBeAbleToGetEntries() { + MerkleTree<String> cut = emptyTree() + .add("value1", "ala", "ma", "kota") + .add("value2", "ala", "ma", "psa"); + + assertThat(cut.get("ala", "ma", "kota")).contains("value1"); + assertThat(cut.get("ala", "ma", "psa")).contains("value2"); + } + + @Test + void shouldReturnNoneForNonExistingPaths() { + MerkleTree<String> cut = emptyTree() + .add("value1", "ala", "ma", "kota") + .add("value2", "ala", "ma", "psa"); + + assertThat(cut.get("ala", "je", "obiad")).isEmpty(); + } + + @Test + void shouldReturnNoneWhenNodeDoesntContainValue() { + MerkleTree<String> cut = emptyTree() + .add("value1", "ala", "ma", "kota") + .add("value2", "ala", "ma", "psa"); + + assertThat(cut.get("ala", "ma")).isEmpty(); + } + + + @Test + void shouldNotCreateNewObjectWhenNothingChanged() { + MerkleTree<String> cut = emptyTree() + .add("some value", "ala", "ma", "kota"); + + final MerkleTree<String> result = cut.add("some value", "ala", "ma", "kota"); + + assertThat(result).isSameAs(cut); + } + + @Test + void shouldRecalculateHashesAfterAddingNewNode() { + MerkleTree<String> cut = emptyTree() + .add("value1", "ala", "ma", "kota") + .add("value2", "ala", "ma", "psa") + .add("value3", "ala", "name"); + + final MerkleTree<String> modified = cut.add("value4", "ala", "surname"); + + assertThat(modified).isNotSameAs(cut); + + assertThat(modified.hashOf("ala", "ma")).isEqualTo(cut.hashOf("ala", "ma")); + assertThat(modified.hashOf("ala", "ma", "kota")).isEqualTo(cut.hashOf("ala", "ma", "kota")); + assertThat(modified.hashOf("ala", "ma", "psa")).isEqualTo(cut.hashOf("ala", "ma", "psa")); + assertThat(modified.hashOf("ala", "name")).isEqualTo(cut.hashOf("ala", "name")); + + assertThat(modified.hashOf("ala", "surname")).isNotEqualTo(cut.hashOf("ala", "surname")); + assertThat(modified.hashOf("ala")).isNotEqualTo(cut.hashOf("ala")); + assertThat(modified.hash()).isNotEqualTo(cut.hash()); + } + + private MerkleTree<String> emptyTree() { + return MerkleTree.emptyWithDefaultDigest(String::getBytes); + } +}
\ No newline at end of file |