From ab27d8c611a8b780ff656bc72f50c3c880619830 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Fri, 22 Feb 2019 17:24:29 +0100 Subject: Improve ListenableCbsConfig Change-Id: I6a3d7cbf78fef398b9b82080e6ce762dcfe24376 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1274 --- .../api/listener/CompositeTreeChangeListener.java | 50 ++++++++++++++++++ .../client/api/listener/ListenableCbsConfig.java | 61 ++++------------------ .../cbs/client/api/listener/TreePathListener.java | 54 +++++++++++++++++++ .../api/listener/ListenableCbsConfigTest.java | 4 +- .../cbs/client/api/listener/MerkleTreeTest.java | 1 - 5 files changed, 116 insertions(+), 54 deletions(-) create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java (limited to 'rest-services/cbs-client/src') diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java new file mode 100644 index 00000000..2f90f5e9 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.control.Option; +import java.util.Collection; +import java.util.HashSet; + +/** + * @since 1.1.2 + * + * NOTE + * The class is thread unsafe + */ +class CompositeTreeChangeListener implements TreeChangeListener { + + private final Collection> listeners = new HashSet<>(); + + public void addListener(TreeChangeListener listener) { + listeners.add(listener); + } + + public void removeListener(TreeChangeListener listener) { + listeners.remove(listener); + } + + @Override + public void accept(Option> updatedSubtree) { + for (TreeChangeListener listener : listeners) { + listener.accept(updatedSubtree); + } + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java index 46c032ea..ce065d1d 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java @@ -22,10 +22,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; import io.vavr.collection.List; import io.vavr.control.Option; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -38,27 +34,9 @@ import reactor.core.publisher.Flux; public class ListenableCbsConfig { private MerkleTree tree = MerkleTree.emptyWithDefaultDigest(String::getBytes); - private final Map, CompositeTreeChangeListener> pathListeners = new HashMap<>(); - private final Object listenersUpdateMonitor = new Object(); + private final TreePathListener pathListener = new TreePathListener<>(); private final Object treeUpdateMonitor = new Object(); - public void listen(List path, TreeChangeListener listener) { - synchronized (listenersUpdateMonitor) { - CompositeTreeChangeListener compositeListener = pathListeners - .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>()); - compositeListener.addListener(listener); - } - } - - public void cancel(List path, TreeChangeListener listener) { - synchronized (listenersUpdateMonitor) { - CompositeTreeChangeListener compositeListener = pathListeners.get(path); - if (compositeListener != null) { - compositeListener.removeListener(listener); - } - } - } - public Flux>> subtreeChanges(List path) { return Flux.create(sink -> { final TreeChangeListener listener = sink::next; @@ -67,43 +45,26 @@ public class ListenableCbsConfig { }); } + public void listen(List path, TreeChangeListener listener) { + pathListener.listen(path, listener); + } + + public void cancel(List path, TreeChangeListener listener) { + pathListener.cancel(path, listener); + } + public Disposable subscribeForUpdates(Flux> updates) { return updates.subscribe(this::update); } public void update(MerkleTree newTree) { final MerkleTree oldTree; + synchronized (treeUpdateMonitor) { oldTree = tree; tree = newTree; } - for (Map.Entry, CompositeTreeChangeListener> entry : pathListeners.entrySet()) { - final List path = entry.getKey(); - final CompositeTreeChangeListener listeners = entry.getValue(); - if (!newTree.isSame(path, oldTree)) { - listeners.accept(newTree, path); - } - } - } - - private static class CompositeTreeChangeListener implements TreeChangeListener { - - private final Collection> listeners = new HashSet<>(); - - void addListener(TreeChangeListener listener) { - listeners.add(listener); - } - - void removeListener(TreeChangeListener listener) { - listeners.remove(listener); - } - - @Override - public void accept(Option> updatedSubtree) { - for (TreeChangeListener listener : listeners) { - listener.accept(updatedSubtree); - } - } + pathListener.update(oldTree, newTree); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java new file mode 100644 index 00000000..6c578317 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; + +import io.vavr.collection.List; +import java.util.HashMap; +import java.util.Map; + +/** + * @since 1.1.2 + */ +class TreePathListener { + + private final Map, CompositeTreeChangeListener> pathListeners = new HashMap<>(); + + public synchronized void listen(List path, TreeChangeListener listener) { + CompositeTreeChangeListener compositeListener = pathListeners + .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>()); + + compositeListener.addListener(listener); + } + + public synchronized void cancel(List path, TreeChangeListener listener) { + CompositeTreeChangeListener compositeListener = pathListeners.get(path); + if (compositeListener != null) { + compositeListener.removeListener(listener); + } + } + + public void update(MerkleTree oldTree, MerkleTree newTree) { + pathListeners.forEach((path, listener) -> { + if (!newTree.isSame(path, oldTree)) { + listener.accept(newTree, path); + } + }); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java index 3e77251d..b3ef1bd5 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java @@ -25,8 +25,6 @@ import static org.assertj.core.api.Assertions.assertThat; import io.vavr.collection.List; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; import reactor.core.publisher.Flux; import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; @@ -135,4 +133,4 @@ class ListenableCbsConfigTest { .verifyComplete(); } -} \ No newline at end of file +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java index 2a9a7fcd..dee1150e 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; /** * @author Piotr Jaszczyk -- cgit 1.2.3-korg