summaryrefslogtreecommitdiffstats
path: root/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
diff options
context:
space:
mode:
Diffstat (limited to 'netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java')
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java142
1 files changed, 142 insertions, 0 deletions
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
new file mode 100644
index 0000000..76827d4
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import io.netty.channel.Channel;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Features of subscribing part of both notifications.
+ */
+abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+
+ private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
+ private final EventBus eventBus;
+
+ @SuppressWarnings("rawtypes")
+ private EventBusChangeRecorder eventBusChangeRecorder;
+ @SuppressWarnings("rawtypes")
+ private ListenerRegistration registration;
+
+ /**
+ * Creating {@link EventBus}.
+ */
+ protected AbstractCommonSubscriber() {
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ }
+
+ @Override
+ public final boolean hasSubscribers() {
+ return !this.subscribers.isEmpty();
+ }
+
+ @Override
+ public final Set<Channel> getSubscribers() {
+ return this.subscribers;
+ }
+
+ @Override
+ public final void close() {
+ if (registration != null) {
+ this.registration.close();
+ this.registration = null;
+ }
+
+ unregister();
+ }
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void addSubscriber(final Channel subscriber) {
+ if (!subscriber.isActive()) {
+ LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
+ }
+ final Event event = new Event(EventType.REGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber subscriber channel
+ */
+ public void removeSubscriber(final Channel subscriber) {
+ LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ final Event event = new Event(EventType.DEREGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Sets {@link ListenerRegistration} registration.
+ *
+ * @param registration
+ * DOMDataChangeListener registration
+ */
+ @SuppressWarnings("rawtypes")
+ public void setRegistration(final ListenerRegistration registration) {
+ this.registration = registration;
+ }
+
+ /**
+ * Checks if {@link ListenerRegistration} registration exist.
+ *
+ * @return True if exist, false otherwise.
+ */
+ public boolean isListening() {
+ return this.registration != null;
+ }
+
+ /**
+ * Creating and registering {@link EventBusChangeRecorder} of specific
+ * listener on {@link EventBus}.
+ *
+ * @param listener
+ * specific listener of notifications
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected <T extends BaseListenerInterface> void register(final T listener) {
+ this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
+ this.eventBus.register(this.eventBusChangeRecorder);
+ }
+
+ /**
+ * Post event to event bus.
+ *
+ * @param event
+ * data of incoming notifications
+ */
+ protected void post(final Event event) {
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Removes all subscribers and unregisters event bus change recorder form
+ * event bus.
+ */
+ protected void unregister() {
+ this.subscribers.clear();
+ this.eventBus.unregister(this.eventBusChangeRecorder);
+ }
+}