summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java50
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java61
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java54
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java4
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java1
5 files changed, 116 insertions, 54 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java
new file mode 100644
index 00000000..2f90f5e9
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/CompositeTreeChangeListener.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.control.Option;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * @since 1.1.2
+ *
+ * NOTE
+ * The class is thread unsafe
+ */
+class CompositeTreeChangeListener<V> implements TreeChangeListener<V> {
+
+ private final Collection<TreeChangeListener<V>> listeners = new HashSet<>();
+
+ public void addListener(TreeChangeListener<V> listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(TreeChangeListener<V> listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void accept(Option<MerkleTree<V>> updatedSubtree) {
+ for (TreeChangeListener<V> listener : listeners) {
+ listener.accept(updatedSubtree);
+ }
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
index 46c032ea..ce065d1d 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfig.java
@@ -22,10 +22,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
import io.vavr.collection.List;
import io.vavr.control.Option;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
@@ -38,27 +34,9 @@ import reactor.core.publisher.Flux;
public class ListenableCbsConfig {
private MerkleTree<String> tree = MerkleTree.emptyWithDefaultDigest(String::getBytes);
- private final Map<List<String>, CompositeTreeChangeListener<String>> pathListeners = new HashMap<>();
- private final Object listenersUpdateMonitor = new Object();
+ private final TreePathListener<String> pathListener = new TreePathListener<>();
private final Object treeUpdateMonitor = new Object();
- public void listen(List<String> path, TreeChangeListener<String> listener) {
- synchronized (listenersUpdateMonitor) {
- CompositeTreeChangeListener<String> compositeListener = pathListeners
- .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>());
- compositeListener.addListener(listener);
- }
- }
-
- public void cancel(List<String> path, TreeChangeListener<String> listener) {
- synchronized (listenersUpdateMonitor) {
- CompositeTreeChangeListener<String> compositeListener = pathListeners.get(path);
- if (compositeListener != null) {
- compositeListener.removeListener(listener);
- }
- }
- }
-
public Flux<Option<MerkleTree<String>>> subtreeChanges(List<String> path) {
return Flux.create(sink -> {
final TreeChangeListener<String> listener = sink::next;
@@ -67,43 +45,26 @@ public class ListenableCbsConfig {
});
}
+ public void listen(List<String> path, TreeChangeListener<String> listener) {
+ pathListener.listen(path, listener);
+ }
+
+ public void cancel(List<String> path, TreeChangeListener<String> listener) {
+ pathListener.cancel(path, listener);
+ }
+
public Disposable subscribeForUpdates(Flux<MerkleTree<String>> updates) {
return updates.subscribe(this::update);
}
public void update(MerkleTree<String> newTree) {
final MerkleTree<String> oldTree;
+
synchronized (treeUpdateMonitor) {
oldTree = tree;
tree = newTree;
}
- for (Map.Entry<List<String>, CompositeTreeChangeListener<String>> entry : pathListeners.entrySet()) {
- final List<String> path = entry.getKey();
- final CompositeTreeChangeListener<String> listeners = entry.getValue();
- if (!newTree.isSame(path, oldTree)) {
- listeners.accept(newTree, path);
- }
- }
- }
-
- private static class CompositeTreeChangeListener<V> implements TreeChangeListener<V> {
-
- private final Collection<TreeChangeListener<V>> listeners = new HashSet<>();
-
- void addListener(TreeChangeListener<V> listener) {
- listeners.add(listener);
- }
-
- void removeListener(TreeChangeListener<V> listener) {
- listeners.remove(listener);
- }
-
- @Override
- public void accept(Option<MerkleTree<V>> updatedSubtree) {
- for (TreeChangeListener<V> listener : listeners) {
- listener.accept(updatedSubtree);
- }
- }
+ pathListener.update(oldTree, newTree);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java
new file mode 100644
index 00000000..6c578317
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/TreePathListener.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+
+import io.vavr.collection.List;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @since 1.1.2
+ */
+class TreePathListener<T> {
+
+ private final Map<List<String>, CompositeTreeChangeListener<T>> pathListeners = new HashMap<>();
+
+ public synchronized void listen(List<String> path, TreeChangeListener<T> listener) {
+ CompositeTreeChangeListener<T> compositeListener = pathListeners
+ .computeIfAbsent(path, p -> new CompositeTreeChangeListener<>());
+
+ compositeListener.addListener(listener);
+ }
+
+ public synchronized void cancel(List<String> path, TreeChangeListener<T> listener) {
+ CompositeTreeChangeListener<T> compositeListener = pathListeners.get(path);
+ if (compositeListener != null) {
+ compositeListener.removeListener(listener);
+ }
+ }
+
+ public void update(MerkleTree<T> oldTree, MerkleTree<T> newTree) {
+ pathListeners.forEach((path, listener) -> {
+ if (!newTree.isSame(path, oldTree)) {
+ listener.accept(newTree, path);
+ }
+ });
+ }
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java
index 3e77251d..b3ef1bd5 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/ListenableCbsConfigTest.java
@@ -25,8 +25,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.vavr.collection.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.ListenableCbsConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.test.StepVerifier;
@@ -135,4 +133,4 @@ class ListenableCbsConfigTest {
.verifyComplete();
}
-} \ No newline at end of file
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java
index 2a9a7fcd..dee1150e 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeTest.java
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>