summaryrefslogtreecommitdiffstats
path: root/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams
diff options
context:
space:
mode:
Diffstat (limited to 'netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams')
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java142
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java153
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java161
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java47
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java77
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java53
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java15
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java425
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java181
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java230
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java152
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java185
-rw-r--r--netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java31
13 files changed, 1852 insertions, 0 deletions
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
new file mode 100644
index 0000000..76827d4
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import io.netty.channel.Channel;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Features of subscribing part of both notifications.
+ */
+abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+
+ private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
+ private final EventBus eventBus;
+
+ @SuppressWarnings("rawtypes")
+ private EventBusChangeRecorder eventBusChangeRecorder;
+ @SuppressWarnings("rawtypes")
+ private ListenerRegistration registration;
+
+ /**
+ * Creating {@link EventBus}.
+ */
+ protected AbstractCommonSubscriber() {
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ }
+
+ @Override
+ public final boolean hasSubscribers() {
+ return !this.subscribers.isEmpty();
+ }
+
+ @Override
+ public final Set<Channel> getSubscribers() {
+ return this.subscribers;
+ }
+
+ @Override
+ public final void close() {
+ if (registration != null) {
+ this.registration.close();
+ this.registration = null;
+ }
+
+ unregister();
+ }
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void addSubscriber(final Channel subscriber) {
+ if (!subscriber.isActive()) {
+ LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
+ }
+ final Event event = new Event(EventType.REGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber subscriber channel
+ */
+ public void removeSubscriber(final Channel subscriber) {
+ LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ final Event event = new Event(EventType.DEREGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Sets {@link ListenerRegistration} registration.
+ *
+ * @param registration
+ * DOMDataChangeListener registration
+ */
+ @SuppressWarnings("rawtypes")
+ public void setRegistration(final ListenerRegistration registration) {
+ this.registration = registration;
+ }
+
+ /**
+ * Checks if {@link ListenerRegistration} registration exist.
+ *
+ * @return True if exist, false otherwise.
+ */
+ public boolean isListening() {
+ return this.registration != null;
+ }
+
+ /**
+ * Creating and registering {@link EventBusChangeRecorder} of specific
+ * listener on {@link EventBus}.
+ *
+ * @param listener
+ * specific listener of notifications
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected <T extends BaseListenerInterface> void register(final T listener) {
+ this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
+ this.eventBus.register(this.eventBusChangeRecorder);
+ }
+
+ /**
+ * Post event to event bus.
+ *
+ * @param event
+ * data of incoming notifications
+ */
+ protected void post(final Event event) {
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Removes all subscribers and unregisters event bus change recorder form
+ * event bus.
+ */
+ protected void unregister() {
+ this.subscribers.clear();
+ this.eventBus.unregister(this.eventBusChangeRecorder);
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java
new file mode 100644
index 0000000..7e7bc1a
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import org.opendaylight.yangtools.util.xml.UntrustedXML;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Abstract class for processing and preparing data.
+ *
+ */
+abstract class AbstractNotificationsData {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class);
+ private static final TransformerFactory TF = TransformerFactory.newInstance();
+ private static final XMLOutputFactory OF = XMLOutputFactory.newFactory();
+
+ /**
+ * Formats data specified by RFC3339.
+ *
+ * @param now time stamp
+ * @return Data specified by RFC3339.
+ */
+ protected static String toRFC3339(final Instant now) {
+ return DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(OffsetDateTime.ofInstant(now, ZoneId.systemDefault()));
+ }
+
+ /**
+ * Creates {@link Document} document.
+ *
+ * @return {@link Document} document.
+ */
+ protected static Document createDocument() {
+ return UntrustedXML.newDocumentBuilder().newDocument();
+ }
+
+ /**
+ * Write normalized node to {@link DOMResult}.
+ *
+ * @param normalized
+ * data
+ * @param inference
+ * SchemaInferenceStack state for the data
+ * @return {@link DOMResult}
+ */
+ protected DOMResult writeNormalizedNode(final NormalizedNode normalized, final Inference inference)
+ throws IOException, XMLStreamException {
+ final Document doc = UntrustedXML.newDocumentBuilder().newDocument();
+ final DOMResult result = new DOMResult(doc);
+ NormalizedNodeWriter normalizedNodeWriter = null;
+ NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+ XMLStreamWriter writer = null;
+
+ try {
+ writer = OF.createXMLStreamWriter(result);
+ normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, inference);
+ normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+ normalizedNodeWriter.write(normalized);
+
+ normalizedNodeWriter.flush();
+ } finally {
+ if (normalizedNodeWriter != null) {
+ normalizedNodeWriter.close();
+ }
+ if (normalizedNodeStreamWriter != null) {
+ normalizedNodeStreamWriter.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Generating base element of every notification.
+ *
+ * @param doc
+ * base {@link Document}
+ * @return element of {@link Document}
+ */
+ protected Element basePartDoc(final Document doc) {
+ final Element notificationElement =
+ doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification");
+
+ doc.appendChild(notificationElement);
+
+ final Element eventTimeElement = doc.createElement("eventTime");
+ eventTimeElement.setTextContent(toRFC3339(Instant.now()));
+ notificationElement.appendChild(eventTimeElement);
+
+ return notificationElement;
+ }
+
+ /**
+ * Generating of {@link Document} transforming to string.
+ *
+ * @param doc
+ * {@link Document} with data
+ * @return - string from {@link Document}
+ */
+ protected String transformDoc(final Document doc) {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ try {
+ final Transformer transformer = TF.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(doc), new StreamResult(out));
+ } catch (final TransformerException e) {
+ // FIXME: this should raise an exception
+ final String msg = "Error during transformation of Document into String";
+ LOG.error(msg, e);
+ return msg;
+ }
+
+ return new String(out.toByteArray(), StandardCharsets.UTF_8);
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java
new file mode 100644
index 0000000..4697646
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.StringReader;
+import java.time.Instant;
+import java.util.Optional;
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+/**
+ * Features of query parameters part of both notifications.
+ *
+ */
+abstract class AbstractQueryParams extends AbstractNotificationsData {
+ // FIXME: BUG-7956: switch to using UntrustedXML
+ private static final DocumentBuilderFactory DBF;
+
+ static {
+ final DocumentBuilderFactory f = DocumentBuilderFactory.newInstance();
+ f.setCoalescing(true);
+ f.setExpandEntityReferences(false);
+ f.setIgnoringElementContentWhitespace(true);
+ f.setIgnoringComments(true);
+ f.setXIncludeAware(false);
+ try {
+ f.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+ f.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+ f.setFeature("http://xml.org/sax/features/external-general-entities", false);
+ f.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+ } catch (final ParserConfigurationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ DBF = f;
+ }
+
+ // FIXME: these should be final
+ private Instant start = null;
+ private Instant stop = null;
+ private String filter = null;
+ private boolean leafNodesOnly = false;
+ private boolean skipNotificationData = false;
+
+ @VisibleForTesting
+ public final Instant getStart() {
+ return start;
+ }
+
+ /**
+ * Set query parameters for listener.
+ *
+ * @param start
+ * start-time of getting notification
+ * @param stop
+ * stop-time of getting notification
+ * @param filter
+ * indicate which subset of all possible events are of interest
+ * @param leafNodesOnly
+ * if true, notifications will contain changes to leaf nodes only
+ * @param skipNotificationData
+ * if true, notification will not contain changed data
+ */
+ @SuppressWarnings("checkstyle:hiddenField")
+ public void setQueryParams(final Instant start, final Optional<Instant> stop, final Optional<String> filter,
+ final boolean leafNodesOnly, final boolean skipNotificationData) {
+ this.start = requireNonNull(start);
+ this.stop = stop.orElse(null);
+ this.filter = filter.orElse(null);
+ this.leafNodesOnly = leafNodesOnly;
+ this.skipNotificationData = skipNotificationData;
+ }
+
+ /**
+ * Check whether this query should only notify about leaf node changes.
+ *
+ * @return true if this query should only notify about leaf node changes
+ */
+ public boolean getLeafNodesOnly() {
+ return leafNodesOnly;
+ }
+
+ /**
+ * Check whether this query should notify changes without data.
+ *
+ * @return true if this query should notify about changes with data
+ */
+ public boolean isSkipNotificationData() {
+ return skipNotificationData;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ <T extends BaseListenerInterface> boolean checkStartStop(final Instant now, final T listener) {
+ if (this.stop != null) {
+ if (this.start.compareTo(now) < 0 && this.stop.compareTo(now) > 0) {
+ return true;
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ return true;
+ }
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check if is filter used and then prepare and post data do client.
+ *
+ * @param xml data of notification
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ boolean checkFilter(final String xml) {
+ if (this.filter == null) {
+ return true;
+ }
+
+ try {
+ return parseFilterParam(xml);
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem while parsing filter.", e);
+ }
+ }
+
+ /**
+ * Parse and evaluate filter value by xml.
+ *
+ * @return true or false - depends on filter expression and data of
+ * notifiaction
+ * @throws Exception if operation fails
+ */
+ private boolean parseFilterParam(final String xml) throws Exception {
+ final Document docOfXml = DBF.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+ // FIXME: BUG-7956: xPath.setNamespaceContext(nsContext);
+ return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java
new file mode 100644
index 0000000..4804e16
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+import java.util.Set;
+
+/**
+ * Base interface for both listeners({@link ListenerAdapter},
+ * {@link NotificationListenerAdapter}).
+ */
+interface BaseListenerInterface extends AutoCloseable {
+
+ /**
+ * Return all subscribers of listener.
+ *
+ * @return set of subscribers
+ */
+ Set<Channel> getSubscribers();
+
+ /**
+ * Checks if exists at least one {@link Channel} subscriber.
+ *
+ * @return True if exist at least one {@link Channel} subscriber, false
+ * otherwise.
+ */
+ boolean hasSubscribers();
+
+ /**
+ * Get name of stream.
+ *
+ * @return stream name
+ */
+ String getStreamName();
+
+ /**
+ * Get output type.
+ *
+ * @return outputType
+ */
+ String getOutputType();
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java
new file mode 100644
index 0000000..486d807
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+
+/**
+ * Represents event of specific {@link EventType} type, holds data and
+ * {@link Channel} subscriber.
+ */
+class Event {
+ private final EventType type;
+ private Channel subscriber;
+ private String data;
+
+ /**
+ * Creates new event specified by {@link EventType} type.
+ *
+ * @param type
+ * EventType
+ */
+ Event(final EventType type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the {@link Channel} subscriber.
+ *
+ * @return Channel
+ */
+ public Channel getSubscriber() {
+ return this.subscriber;
+ }
+
+ /**
+ * Sets subscriber for event.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void setSubscriber(final Channel subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ /**
+ * Gets event String.
+ *
+ * @return String representation of event data.
+ */
+ public String getData() {
+ return this.data;
+ }
+
+ /**
+ * Sets event data.
+ *
+ * @param data
+ * String.
+ */
+ public void setData(final String data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets event type.
+ *
+ * @return The type of the event.
+ */
+ public EventType getType() {
+ return this.type;
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java
new file mode 100644
index 0000000..11e5656
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.Subscribe;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class EventBusChangeRecorder<T extends BaseListenerInterface> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class);
+ private final T listener;
+
+ /**
+ * Event bus change recorder of specific listener of notifications.
+ *
+ * @param listener
+ * specific listener
+ */
+ EventBusChangeRecorder(final T listener) {
+ this.listener = listener;
+ }
+
+ @Subscribe
+ public void recordCustomerChange(final Event event) {
+ if (event.getType() == EventType.REGISTER) {
+ final Channel subscriber = event.getSubscriber();
+ if (!this.listener.getSubscribers().contains(subscriber)) {
+ this.listener.getSubscribers().add(subscriber);
+ }
+ } else if (event.getType() == EventType.DEREGISTER) {
+ this.listener.getSubscribers().remove(event.getSubscriber());
+ Notificator.removeListenerIfNoSubscriberExists(this.listener);
+ } else if (event.getType() == EventType.NOTIFY) {
+ for (final Channel subscriber : this.listener.getSubscribers()) {
+ if (subscriber.isActive()) {
+ LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+ } else {
+ LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+ this.listener.getSubscribers().remove(subscriber);
+ }
+ }
+ }
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java
new file mode 100644
index 0000000..ba7c2a3
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+/**
+ * Type of the event.
+ */
+enum EventType {
+ REGISTER, DEREGISTER, NOTIFY
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java
new file mode 100644
index 0000000..fc85862
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright (c) 2014, 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import org.json.XML;
+import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+/**
+ * {@link ListenerAdapter} is responsible to track events, which occurred by
+ * changing data in data source.
+ */
+public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
+ private static final String DATA_CHANGE_EVENT = "data-change-event";
+ private static final String PATH = "path";
+ private static final String OPERATION = "operation";
+
+ private final ControllerContext controllerContext;
+ private final YangInstanceIdentifier path;
+ private final String streamName;
+ private final NotificationOutputType outputType;
+
+ /**
+ * Creates new {@link ListenerAdapter} listener specified by path and stream
+ * name and register for subscribing.
+ *
+ * @param path
+ * Path to data in data store.
+ * @param streamName
+ * The name of the stream.
+ * @param outputType
+ * Type of output on notification (JSON, XML)
+ */
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "non-final for testing")
+ ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType, final ControllerContext controllerContext) {
+ this.outputType = requireNonNull(outputType);
+ this.path = requireNonNull(path);
+ checkArgument(streamName != null && !streamName.isEmpty());
+ this.streamName = streamName;
+ this.controllerContext = controllerContext;
+ register(this);
+ }
+
+ @Override
+ public void onInitialData() {
+ // No-op
+ }
+
+ @Override
+ public void onDataTreeChanged(final List<DataTreeCandidate> dataTreeCandidates) {
+ final Instant now = Instant.now();
+ if (!checkStartStop(now, this)) {
+ return;
+ }
+
+ final String xml = prepareXml(dataTreeCandidates);
+ if (checkFilter(xml)) {
+ prepareAndPostData(xml);
+ }
+ }
+
+ /**
+ * Gets the name of the stream.
+ *
+ * @return The name of the stream.
+ */
+ @Override
+ public String getStreamName() {
+ return streamName;
+ }
+
+ @Override
+ public String getOutputType() {
+ return outputType.getName();
+ }
+
+ /**
+ * Get path pointed to data in data store.
+ *
+ * @return Path pointed to data in data store.
+ */
+ public YangInstanceIdentifier getPath() {
+ return path;
+ }
+
+ /**
+ * Prepare data of notification and data to client.
+ *
+ * @param xml data
+ */
+ private void prepareAndPostData(final String xml) {
+ final Event event = new Event(EventType.NOTIFY);
+ if (outputType.equals(NotificationOutputType.JSON)) {
+ event.setData(XML.toJSONObject(xml).toString());
+ } else {
+ event.setData(xml);
+ }
+ post(event);
+ }
+
+ /**
+ * Tracks events of data change by customer.
+ */
+
+ /**
+ * Prepare data in printable form and transform it to String.
+ *
+ * @return Data in printable form.
+ */
+ private String prepareXml(final Collection<DataTreeCandidate> candidates) {
+ final EffectiveModelContext schemaContext = controllerContext.getGlobalSchema();
+ final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
+ final Document doc = createDocument();
+ final Element notificationElement = basePartDoc(doc);
+
+ final Element dataChangedNotificationEventElement = doc.createElementNS(
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+
+ addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, candidates,
+ schemaContext, dataContextTree);
+ notificationElement.appendChild(dataChangedNotificationEventElement);
+ return transformDoc(doc);
+ }
+
+ /**
+ * Adds values to data changed notification event element.
+ *
+ * @param doc
+ * {@link Document}
+ * @param dataChangedNotificationEventElement
+ * {@link Element}
+ * @param dataTreeCandidates
+ * {@link DataTreeCandidate}
+ */
+ private void addValuesToDataChangedNotificationEventElement(final Document doc,
+ final Element dataChangedNotificationEventElement,
+ final Collection<DataTreeCandidate> dataTreeCandidates,
+ final EffectiveModelContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+
+ for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) {
+ DataTreeCandidateNode candidateNode = dataTreeCandidate.getRootNode();
+ if (candidateNode == null) {
+ continue;
+ }
+ YangInstanceIdentifier yiid = dataTreeCandidate.getRootPath();
+
+ boolean isSkipNotificationData = this.isSkipNotificationData();
+ if (isSkipNotificationData) {
+ createCreatedChangedDataChangeEventElementWithoutData(doc,
+ dataChangedNotificationEventElement, dataTreeCandidate.getRootNode());
+ } else {
+ addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, candidateNode,
+ yiid.getParent(), schemaContext, dataSchemaContextTree);
+ }
+ }
+ }
+
+ private void addNodeToDataChangeNotificationEventElement(final Document doc,
+ final Element dataChangedNotificationEventElement, final DataTreeCandidateNode candidateNode,
+ final YangInstanceIdentifier parentYiid, final EffectiveModelContext schemaContext,
+ final DataSchemaContextTree dataSchemaContextTree) {
+
+ Optional<NormalizedNode> optionalNormalizedNode = Optional.empty();
+ switch (candidateNode.getModificationType()) {
+ case APPEARED:
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ optionalNormalizedNode = candidateNode.getDataAfter();
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ optionalNormalizedNode = candidateNode.getDataBefore();
+ break;
+ case UNMODIFIED:
+ default:
+ break;
+ }
+
+ if (optionalNormalizedNode.isEmpty()) {
+ LOG.error("No node present in notification for {}", candidateNode);
+ return;
+ }
+
+ NormalizedNode normalizedNode = optionalNormalizedNode.get();
+ YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid)
+ .append(normalizedNode.getIdentifier()).build();
+
+ boolean isNodeMixin = controllerContext.isNodeMixin(yiid);
+ boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode);
+ if (!isNodeMixin && !isSkippedNonLeaf) {
+ Node node = null;
+ switch (candidateNode.getModificationType()) {
+ case APPEARED:
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ Operation op = candidateNode.getDataBefore().isPresent() ? Operation.UPDATED : Operation.CREATED;
+ node = createCreatedChangedDataChangeEventElement(doc, yiid, normalizedNode, op,
+ schemaContext, dataSchemaContextTree);
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ node = createDataChangeEventElement(doc, yiid, Operation.DELETED);
+ break;
+ case UNMODIFIED:
+ default:
+ break;
+ }
+ if (node != null) {
+ dataChangedNotificationEventElement.appendChild(node);
+ }
+ }
+
+ for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) {
+ addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, childNode,
+ yiid, schemaContext, dataSchemaContextTree);
+ }
+ }
+
+ /**
+ * Creates changed event element from data.
+ *
+ * @param doc
+ * {@link Document}
+ * @param dataPath
+ * Path to data in data store.
+ * @param operation
+ * {@link Operation}
+ * @return {@link Node} node represented by changed event element.
+ */
+ private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier dataPath,
+ final Operation operation) {
+ final Element dataChangeEventElement = doc.createElement(DATA_CHANGE_EVENT);
+ final Element pathElement = doc.createElement(PATH);
+ addPathAsValueToElement(dataPath, pathElement);
+ dataChangeEventElement.appendChild(pathElement);
+
+ final Element operationElement = doc.createElement(OPERATION);
+ operationElement.setTextContent(operation.value);
+ dataChangeEventElement.appendChild(operationElement);
+
+ return dataChangeEventElement;
+ }
+
+ /**
+ * Creates data change notification element without data element.
+ *
+ * @param doc
+ * {@link Document}
+ * @param dataChangedNotificationEventElement
+ * {@link Element}
+ * @param candidateNode
+ * {@link DataTreeCandidateNode}
+ */
+ private void createCreatedChangedDataChangeEventElementWithoutData(final Document doc,
+ final Element dataChangedNotificationEventElement, final DataTreeCandidateNode candidateNode) {
+ final Operation operation;
+ switch (candidateNode.getModificationType()) {
+ case APPEARED:
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ operation = candidateNode.getDataBefore().isPresent() ? Operation.UPDATED : Operation.CREATED;
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ operation = Operation.DELETED;
+ break;
+ case UNMODIFIED:
+ default:
+ return;
+ }
+ Node dataChangeEventElement = createDataChangeEventElement(doc, getPath(), operation);
+ dataChangedNotificationEventElement.appendChild(dataChangeEventElement);
+
+ }
+
+ private Node createCreatedChangedDataChangeEventElement(final Document doc,
+ final YangInstanceIdentifier eventPath, final NormalizedNode normalized, final Operation operation,
+ final EffectiveModelContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ final Element dataChangeEventElement = doc.createElement(DATA_CHANGE_EVENT);
+ final Element pathElement = doc.createElement(PATH);
+ addPathAsValueToElement(eventPath, pathElement);
+ dataChangeEventElement.appendChild(pathElement);
+
+ final Element operationElement = doc.createElement(OPERATION);
+ operationElement.setTextContent(operation.value);
+ dataChangeEventElement.appendChild(operationElement);
+
+ final SchemaInferenceStack stack = dataSchemaContextTree.enterPath(eventPath).orElseThrow().stack();
+ if (!(normalized instanceof MapEntryNode) && !(normalized instanceof UnkeyedListEntryNode)
+ && !stack.isEmpty()) {
+ stack.exit();
+ }
+
+ final var inference = stack.toInference();
+
+ try {
+ final DOMResult domResult = writeNormalizedNode(normalized, inference);
+ final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
+ final Element dataElement = doc.createElement("data");
+ dataElement.appendChild(result);
+ dataChangeEventElement.appendChild(dataElement);
+ } catch (final IOException e) {
+ LOG.error("Error in writer ", e);
+ } catch (final XMLStreamException e) {
+ LOG.error("Error processing stream", e);
+ }
+
+ return dataChangeEventElement;
+ }
+
+ /**
+ * Adds path as value to element.
+ *
+ * @param dataPath
+ * Path to data in data store.
+ * @param element
+ * {@link Element}
+ */
+ @SuppressWarnings("rawtypes")
+ private void addPathAsValueToElement(final YangInstanceIdentifier dataPath, final Element element) {
+ final YangInstanceIdentifier normalizedPath = controllerContext.toXpathRepresentation(dataPath);
+ final StringBuilder textContent = new StringBuilder();
+
+ for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
+ if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
+ continue;
+ }
+ textContent.append("/");
+ writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType());
+ if (pathArgument instanceof NodeIdentifierWithPredicates) {
+ for (final Entry<QName, Object> entry : ((NodeIdentifierWithPredicates) pathArgument).entrySet()) {
+ final QName keyValue = entry.getKey();
+ final String predicateValue = String.valueOf(entry.getValue());
+ textContent.append("[");
+ writeIdentifierWithNamespacePrefix(element, textContent, keyValue);
+ textContent.append("='");
+ textContent.append(predicateValue);
+ textContent.append("'");
+ textContent.append("]");
+ }
+ } else if (pathArgument instanceof NodeWithValue) {
+ textContent.append("[.='");
+ textContent.append(((NodeWithValue) pathArgument).getValue());
+ textContent.append("'");
+ textContent.append("]");
+ }
+ }
+ element.setTextContent(textContent.toString());
+ }
+
+ /**
+ * Writes identifier that consists of prefix and QName.
+ *
+ * @param element
+ * {@link Element}
+ * @param textContent
+ * StringBuilder
+ * @param qualifiedName
+ * QName
+ */
+ private void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
+ final QName qualifiedName) {
+ final Module module = controllerContext.getGlobalSchema().findModule(qualifiedName.getModule())
+ .get();
+
+ textContent.append(module.getName());
+ textContent.append(":");
+ textContent.append(qualifiedName.getLocalName());
+ }
+
+ /**
+ * Consists of three types {@link Operation#CREATED},
+ * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ */
+ private enum Operation {
+ CREATED("created"), UPDATED("updated"), DELETED("deleted");
+
+ private final String value;
+
+ Operation(final String value) {
+ this.value = value;
+ }
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java
new file mode 100644
index 0000000..3061285
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Instant;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+/**
+ * {@link NotificationListenerAdapter} is responsible to track events on notifications.
+ */
+public final class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
+
+ private final ControllerContext controllerContext;
+ private final String streamName;
+ private final Absolute path;
+ private final String outputType;
+
+ /**
+ * Set path of listener and stream name, register event bus.
+ *
+ * @param path
+ * path of notification
+ * @param streamName
+ * stream name of listener
+ * @param outputType
+ * type of output on notification (JSON, XML)
+ */
+ NotificationListenerAdapter(final Absolute path, final String streamName, final String outputType,
+ final ControllerContext controllerContext) {
+ register(this);
+ this.outputType = requireNonNull(outputType);
+ this.path = requireNonNull(path);
+ checkArgument(streamName != null && !streamName.isEmpty());
+ this.streamName = streamName;
+ this.controllerContext = controllerContext;
+ }
+
+ /**
+ * Get outputType of listener.
+ *
+ * @return the outputType
+ */
+ @Override
+ public String getOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public void onNotification(final DOMNotification notification) {
+ final Instant now = Instant.now();
+ if (!checkStartStop(now, this)) {
+ return;
+ }
+
+ final EffectiveModelContext schemaContext = controllerContext.getGlobalSchema();
+ final String xml = prepareXml(schemaContext, notification);
+ if (checkFilter(xml)) {
+ prepareAndPostData(outputType.equals("JSON") ? prepareJson(schemaContext, notification) : xml);
+ }
+ }
+
+ /**
+ * Get stream name of this listener.
+ *
+ * @return {@link String}
+ */
+ @Override
+ public String getStreamName() {
+ return streamName;
+ }
+
+ /**
+ * Get schema path of notification.
+ *
+ * @return {@link Absolute} SchemaNodeIdentifier
+ */
+ public Absolute getSchemaPath() {
+ return path;
+ }
+
+ /**
+ * Prepare data of notification and data to client.
+ *
+ * @param data data
+ */
+ private void prepareAndPostData(final String data) {
+ final Event event = new Event(EventType.NOTIFY);
+ event.setData(data);
+ post(event);
+ }
+
+ /**
+ * Prepare json from notification data.
+ *
+ * @return json as {@link String}
+ */
+ @VisibleForTesting
+ String prepareJson(final EffectiveModelContext schemaContext, final DOMNotification notification) {
+ final JsonObject json = new JsonObject();
+ json.add("ietf-restconf:notification", JsonParser.parseString(writeBodyToString(schemaContext, notification)));
+ json.addProperty("event-time", ListenerAdapter.toRFC3339(Instant.now()));
+ return json.toString();
+ }
+
+ private static String writeBodyToString(final EffectiveModelContext schemaContext,
+ final DOMNotification notification) {
+ final Writer writer = new StringWriter();
+ final NormalizedNodeStreamWriter jsonStream = JSONNormalizedNodeStreamWriter.createExclusiveWriter(
+ JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext),
+ notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
+ final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
+ try {
+ nodeWriter.write(notification.getBody());
+ nodeWriter.close();
+ } catch (final IOException e) {
+ throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
+ }
+ return writer.toString();
+ }
+
+ private String prepareXml(final EffectiveModelContext schemaContext, final DOMNotification notification) {
+ final Document doc = createDocument();
+ final Element notificationElement = basePartDoc(doc);
+
+ final Element notificationEventElement = doc.createElementNS(
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
+ addValuesToNotificationEventElement(doc, notificationEventElement, schemaContext, notification);
+ notificationElement.appendChild(notificationEventElement);
+
+ return transformDoc(doc);
+ }
+
+ private void addValuesToNotificationEventElement(final Document doc, final Element element,
+ final EffectiveModelContext schemaContext, final DOMNotification notification) {
+ try {
+ final DOMResult domResult = writeNormalizedNode(notification.getBody(),
+ SchemaInferenceStack.of(schemaContext, path).toInference());
+ final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
+ final Element dataElement = doc.createElement("notification");
+ dataElement.appendChild(result);
+ element.appendChild(dataElement);
+ } catch (final IOException e) {
+ LOG.error("Error in writer ", e);
+ } catch (final XMLStreamException e) {
+ LOG.error("Error processing stream", e);
+ }
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java
new file mode 100644
index 0000000..5bfa79f
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Notificator} is responsible to create, remove and find
+ * {@link ListenerAdapter} listener.
+ */
+public final class Notificator {
+
+ private static Map<String, ListenerAdapter> dataChangeListener = new ConcurrentHashMap<>();
+ private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName =
+ new ConcurrentHashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(Notificator.class);
+ private static final Lock LOCK = new ReentrantLock();
+
+ private Notificator() {
+ }
+
+ /**
+ * Returns list of all stream names.
+ */
+ public static Set<String> getStreamNames() {
+ return dataChangeListener.keySet();
+ }
+
+ /**
+ * Gets {@link ListenerAdapter} specified by stream name.
+ *
+ * @param streamName
+ * The name of the stream.
+ * @return {@link ListenerAdapter} specified by stream name.
+ */
+ public static ListenerAdapter getListenerFor(final String streamName) {
+ return dataChangeListener.get(streamName);
+ }
+
+ /**
+ * Checks if the listener specified by {@link YangInstanceIdentifier} path exist.
+ *
+ * @param streamName name of the stream
+ * @return True if the listener exist, false otherwise.
+ */
+ public static boolean existListenerFor(final String streamName) {
+ return dataChangeListener.containsKey(streamName);
+ }
+
+ /**
+ * Creates new {@link ListenerAdapter} listener from
+ * {@link YangInstanceIdentifier} path and stream name.
+ *
+ * @param path
+ * Path to data in data repository.
+ * @param streamName
+ * The name of the stream.
+ * @param outputType
+ * Spcific type of output for notifications - XML or JSON
+ * @return New {@link ListenerAdapter} listener from
+ * {@link YangInstanceIdentifier} path and stream name.
+ */
+ public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType, final ControllerContext controllerContext) {
+ final ListenerAdapter listener = new ListenerAdapter(path, streamName, outputType, controllerContext);
+ try {
+ LOCK.lock();
+ dataChangeListener.put(streamName, listener);
+ } finally {
+ LOCK.unlock();
+ }
+ return listener;
+ }
+
+ /**
+ * Looks for listener determined by {@link YangInstanceIdentifier} path and removes it.
+ * Creates String representation of stream name from URI. Removes slash from URI in start and end position.
+ *
+ * @param uri
+ * URI for creation stream name.
+ * @return String representation of stream name.
+ */
+ public static String createStreamNameFromUri(final String uri) {
+ if (uri == null) {
+ return null;
+ }
+ String result = uri;
+ if (result.startsWith("/")) {
+ result = result.substring(1);
+ }
+ if (result.endsWith("/")) {
+ result = result.substring(0, result.length() - 1);
+ }
+ return result;
+ }
+
+ /**
+ * Removes all listeners.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static void removeAllListeners() {
+ for (final ListenerAdapter listener : dataChangeListener.values()) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
+ }
+ }
+ try {
+ LOCK.lock();
+ dataChangeListener = new ConcurrentHashMap<>();
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ /**
+ * Delete {@link ListenerAdapter} listener specified in parameter.
+ *
+ * @param <T>
+ *
+ * @param listener
+ * ListenerAdapter
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static <T extends BaseListenerInterface> void deleteListener(final T listener) {
+ if (listener != null) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
+ }
+ try {
+ LOCK.lock();
+ dataChangeListener.remove(listener.getStreamName());
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * Check if the listener specified by qnames of request exist.
+ *
+ * @param streamName
+ * name of stream
+ * @return True if the listener exist, false otherwise.
+ */
+ public static boolean existNotificationListenerFor(final String streamName) {
+ return notificationListenersByStreamName.containsKey(streamName);
+ }
+
+ /**
+ * Prepare listener for notification ({@link NotificationDefinition}).
+ *
+ * @param paths
+ * paths of notifications
+ * @param streamName
+ * name of stream (generated by paths)
+ * @param outputType
+ * type of output for onNotification - XML or JSON
+ * @return List of {@link NotificationListenerAdapter} by paths
+ */
+ public static List<NotificationListenerAdapter> createNotificationListener(final List<Absolute> paths,
+ final String streamName, final String outputType, final ControllerContext controllerContext) {
+ final List<NotificationListenerAdapter> listListeners = new ArrayList<>();
+ for (final Absolute path : paths) {
+ final NotificationListenerAdapter listener =
+ new NotificationListenerAdapter(path, streamName, outputType, controllerContext);
+ listListeners.add(listener);
+ }
+ try {
+ LOCK.lock();
+ notificationListenersByStreamName.put(streamName, listListeners);
+ } finally {
+ LOCK.unlock();
+ }
+ return listListeners;
+ }
+
+ public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) {
+ if (!listener.hasSubscribers()) {
+ if (listener instanceof NotificationListenerAdapter) {
+ deleteNotificationListener(listener);
+ } else {
+ deleteListener(listener);
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) {
+ if (listener != null) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
+ }
+ try {
+ LOCK.lock();
+ notificationListenersByStreamName.remove(listener.getStreamName());
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
+ return notificationListenersByStreamName.get(streamName);
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java
new file mode 100644
index 0000000..a295c54
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.websockets;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link WebSocketServer} is the singleton responsible for starting and stopping the
+ * web socket server.
+ */
+public final class WebSocketServer implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
+
+ private static final String DEFAULT_ADDRESS = "0.0.0.0";
+
+ private static WebSocketServer instance = null;
+
+ private final String address;
+ private final int port;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+
+ private WebSocketServer(final String address, final int port) {
+ this.address = address;
+ this.port = port;
+ }
+
+ /**
+ * Create singleton instance of {@link WebSocketServer}.
+ *
+ * @param port TCP port used for this server
+ * @return instance of {@link WebSocketServer}
+ */
+ private static WebSocketServer createInstance(final int port) {
+ instance = createInstance(DEFAULT_ADDRESS, port);
+ return instance;
+ }
+
+ public static WebSocketServer createInstance(final String address, final int port) {
+ checkState(instance == null, "createInstance() has already been called");
+ checkArgument(port >= 1024, "Privileged port (below 1024) is not allowed");
+
+ instance = new WebSocketServer(requireNonNull(address, "Address cannot be null."), port);
+ LOG.info("Created WebSocketServer on {}:{}", address, port);
+ return instance;
+ }
+
+ /**
+ * Get the websocket of TCP port.
+ *
+ * @return websocket TCP port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}.
+ *
+ * @return instance of {@link WebSocketServer}
+ */
+ public static WebSocketServer getInstance() {
+ return requireNonNull(instance, "createInstance() must be called prior to getInstance()");
+ }
+
+ /**
+ * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}.
+ * If an instance doesnt exist create one with the provided fallback port.
+ *
+ * @return instance of {@link WebSocketServer}
+ */
+ public static WebSocketServer getInstance(final int fallbackPort) {
+ if (instance != null) {
+ return instance;
+ }
+
+ LOG.warn("No instance for WebSocketServer found, creating one with a fallback port: {}", fallbackPort);
+ return createInstance(fallbackPort);
+ }
+
+ /**
+ * Destroy the existing instance.
+ */
+ public static void destroyInstance() {
+ checkState(instance != null, "createInstance() must be called prior to destroyInstance()");
+
+ instance.stop();
+ instance = null;
+ LOG.info("Destroyed WebSocketServer.");
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void run() {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ try {
+ final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new WebSocketServerInitializer());
+
+ final Channel channel = serverBootstrap.bind(address, port).sync().channel();
+ LOG.info("Web socket server started at address {}, port {}.", address, port);
+
+ channel.closeFuture().sync();
+ } catch (final InterruptedException e) {
+ LOG.error("Web socket server encountered an error during startup attempt on port {}", port, e);
+ } catch (Throwable throwable) {
+ // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
+ LOG.error("Error while binding to address {}, port {}", address, port, throwable);
+ throw throwable;
+ } finally {
+ stop();
+ }
+ }
+
+ /**
+ * Stops the web socket server and removes all listeners.
+ */
+ private void stop() {
+ LOG.info("Stopping the web socket server instance on port {}", port);
+ Notificator.removeAllListeners();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ bossGroup = null;
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ workerGroup = null;
+ }
+ }
+
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java
new file mode 100644
index 0000000..ed90e3f
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.websockets;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
+import static io.netty.handler.codec.http.HttpUtil.setContentLength;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
+import io.netty.util.CharsetUtil;
+import java.util.List;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle
+ * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
+ */
+public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class);
+
+ private WebSocketServerHandshaker handshaker;
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+ if (msg instanceof FullHttpRequest) {
+ handleHttpRequest(ctx, (FullHttpRequest) msg);
+ } else if (msg instanceof WebSocketFrame) {
+ handleWebSocketFrame(ctx, (WebSocketFrame) msg);
+ }
+ }
+
+ /**
+ * Checks if HTTP request method is GET and if is possible to decode HTTP result of request.
+ *
+ * @param ctx ChannelHandlerContext
+ * @param req FullHttpRequest
+ */
+ private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) {
+ // Handle a bad request.
+ if (!req.decoderResult().isSuccess()) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+ return;
+ }
+
+ // Allow only GET methods.
+ if (req.method() != GET) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+ return;
+ }
+
+ final String streamName = Notificator.createStreamNameFromUri(req.uri());
+ if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.addSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully registered.");
+ } else {
+ LOG.error("Listener for stream with name '{}' was not found.", streamName);
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ }
+ } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if (listeners != null && !listeners.isEmpty()) {
+ for (final NotificationListenerAdapter listener : listeners) {
+ listener.addSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully registered.");
+ }
+ } else {
+ LOG.error("Listener for stream with name '{}' was not found.", streamName);
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ // Handshake
+ final WebSocketServerHandshakerFactory wsFactory =
+ new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
+ null, false);
+ this.handshaker = wsFactory.newHandshaker(req);
+ if (this.handshaker == null) {
+ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+ } else {
+ this.handshaker.handshake(ctx.channel(), req);
+ }
+ }
+
+ /**
+ * Checks response status, send response and close connection if necessary.
+ *
+ * @param ctx ChannelHandlerContext
+ * @param req HttpRequest
+ * @param res FullHttpResponse
+ */
+ private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
+ final FullHttpResponse res) {
+ // Generate an error page if response getStatus code is not OK (200).
+ final boolean notOkay = !OK.equals(res.status());
+ if (notOkay) {
+ res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8);
+ setContentLength(res, res.content().readableBytes());
+ }
+
+ // Send the response and close the connection if necessary.
+ final ChannelFuture f = ctx.channel().writeAndFlush(res);
+ if (notOkay || !isKeepAlive(req)) {
+ f.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ /**
+ * Handles web socket frame.
+ *
+ * @param ctx {@link ChannelHandlerContext}
+ * @param frame {@link WebSocketFrame}
+ */
+ private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) {
+ if (frame instanceof CloseWebSocketFrame) {
+ this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+ final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+ if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.removeSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully registered.");
+
+ Notificator.removeListenerIfNoSubscriberExists(listener);
+ }
+ } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if (listeners != null && !listeners.isEmpty()) {
+ for (final NotificationListenerAdapter listener : listeners) {
+ listener.removeSubscriber(ctx.channel());
+ }
+ }
+ }
+ return;
+ } else if (frame instanceof PingWebSocketFrame) {
+ ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+ return;
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ ctx.close();
+ }
+
+ /**
+ * Get web socket location from HTTP request.
+ *
+ * @param req HTTP request from which the location will be returned
+ * @return String representation of web socket location.
+ */
+ private static String getWebSocketLocation(final HttpRequest req) {
+ return "ws://" + req.headers().get(HOST) + req.uri();
+ }
+}
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java
new file mode 100644
index 0000000..365e982
--- /dev/null
+++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.streams.websockets;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+/**
+ * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of a {@link io.netty.channel.Channel}
+ * .
+ */
+public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
+
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("codec-http", new HttpServerCodec());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+ pipeline.addLast("handler", new WebSocketServerHandler());
+ }
+
+}