diff options
5 files changed, 116 insertions, 54 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java new file mode 100644 index 00000000..2f90f5e9 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.control.Option; +import java.util.Collection; +import java.util.HashSet; + +/** + * @since 1.1.2 + * + * NOTE + * The class is thread unsafe + */ +class CompositeTreeChangeListener<V> implements TreeChangeListener<V> { + + private final Collection<TreeChangeListener<V>> listeners = new HashSet<>(); + + public void addListener(TreeChangeListener<V> listener) { + listeners.add(listener); + } + + public void removeListener(TreeChangeListener<V> listener) { + listeners.remove(listener); + } + + @Override + public void accept(Option<MerkleTree<V>> updatedSubtree) { + for (TreeChangeListener<V> listener : listeners) { + listener.accept(updatedSubtree); + } + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java index 46c032ea..ce065d1d 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java @@ -22,10 +22,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; import io.vavr.collection.List; import io.vavr.control.Option; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -38,27 +34,9 @@ import reactor.core.publisher.Flux; public class ListenableCbsConfig { private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes); - private final Map<List<String>, CompositeTreeChangeListener<String>> pathListeners = new HashMap<>(); - private final Object listenersUpdateMonitor = new Object(); + private final TreePathListener<String> pathListener = new TreePathListener<>(); private final Object treeUpdateMonitor = new Object(); - public void listen(List<String> path, TreeChangeListener<String> listener) { - synchronized (listenersUpdateMonitor) { - CompositeTreeChangeListener<String> compositeListener = pathListeners - .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>()); - compositeListener.addListener(listener); - } - } - - public void cancel(List<String> path, TreeChangeListener<String> listener) { - synchronized (listenersUpdateMonitor) { - CompositeTreeChangeListener<String> compositeListener = pathListeners.get(path); - if (compositeListener != null) { - compositeListener.removeListener(listener); - } - } - } - public Flux<Option<MerkleTree<String>>> subtreeChanges(List<String> path) { return Flux.create(sink -> { final TreeChangeListener<String> listener = sink::next; @@ -67,43 +45,26 @@ public class ListenableCbsConfig { }); } + public void listen(List<String> path, TreeChangeListener<String> listener) { + pathListener.listen(path, listener); + } + + public void cancel(List<String> path, TreeChangeListener<String> listener) { + pathListener.cancel(path, listener); + } + public Disposable subscribeForUpdates(Flux<MerkleTree<String>> updates) { return updates.subscribe(this::update); } public void update(MerkleTree<String> newTree) { final MerkleTree<String> oldTree; + synchronized (treeUpdateMonitor) { oldTree = tree; tree = newTree; } - for (Map.Entry<List<String>, CompositeTreeChangeListener<String>> entry : pathListeners.entrySet()) { - final List<String> path = entry.getKey(); - final CompositeTreeChangeListener<String> listeners = entry.getValue(); - if (!newTree.isSame(path, oldTree)) { - listeners.accept(newTree, path); - } - } - } - - private static class CompositeTreeChangeListener<V> implements TreeChangeListener<V> { - - private final Collection<TreeChangeListener<V>> listeners = new HashSet<>(); - - void addListener(TreeChangeListener<V> listener) { - listeners.add(listener); - } - - void removeListener(TreeChangeListener<V> listener) { - listeners.remove(listener); - } - - @Override - public void accept(Option<MerkleTree<V>> updatedSubtree) { - for (TreeChangeListener<V> listener : listeners) { - listener.accept(updatedSubtree); - } - } + pathListener.update(oldTree, newTree); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java new file mode 100644 index 00000000..6c578317 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.collection.List; +import java.util.HashMap; +import java.util.Map; + +/** + * @since 1.1.2 + */ +class TreePathListener<T> { + + private final Map<List<String>, CompositeTreeChangeListener<T>> pathListeners = new HashMap<>(); + + public synchronized void listen(List<String> path, TreeChangeListener<T> listener) { + CompositeTreeChangeListener<T> compositeListener = pathListeners + .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>()); + + compositeListener.addListener(listener); + } + + public synchronized void cancel(List<String> path, TreeChangeListener<T> listener) { + CompositeTreeChangeListener<T> compositeListener = pathListeners.get(path); + if (compositeListener != null) { + compositeListener.removeListener(listener); + } + } + + public void update(MerkleTree<T> oldTree, MerkleTree<T> newTree) { + pathListeners.forEach((path, listener) -> { + if (!newTree.isSame(path, oldTree)) { + listener.accept(newTree, path); + } + }); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java index 3e77251d..b3ef1bd5 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java @@ -25,8 +25,6 @@ import static org.assertj.core.api.Assertions.assertThat; import io.vavr.collection.List; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; import reactor.core.publisher.Flux; import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; @@ -135,4 +133,4 @@ class ListenableCbsConfigTest { .verifyComplete(); } -}
\ No newline at end of file +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java index 2a9a7fcd..dee1150e 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> |