diff options
Diffstat (limited to 'netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams')
13 files changed, 1852 insertions, 0 deletions
diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java new file mode 100644 index 0000000..76827d4 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import io.netty.channel.Channel; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Features of subscribing part of both notifications. + */ +abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class); + + private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet(); + private final EventBus eventBus; + + @SuppressWarnings("rawtypes") + private EventBusChangeRecorder eventBusChangeRecorder; + @SuppressWarnings("rawtypes") + private ListenerRegistration registration; + + /** + * Creating {@link EventBus}. + */ + protected AbstractCommonSubscriber() { + this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + } + + @Override + public final boolean hasSubscribers() { + return !this.subscribers.isEmpty(); + } + + @Override + public final Set<Channel> getSubscribers() { + return this.subscribers; + } + + @Override + public final void close() { + if (registration != null) { + this.registration.close(); + this.registration = null; + } + + unregister(); + } + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(final Channel subscriber) { + if (!subscriber.isActive()) { + LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress()); + } + final Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber subscriber channel + */ + public void removeSubscriber(final Channel subscriber) { + LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + final Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * DOMDataChangeListener registration + */ + @SuppressWarnings("rawtypes") + public void setRegistration(final ListenerRegistration registration) { + this.registration = registration; + } + + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ + public boolean isListening() { + return this.registration != null; + } + + /** + * Creating and registering {@link EventBusChangeRecorder} of specific + * listener on {@link EventBus}. + * + * @param listener + * specific listener of notifications + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected <T extends BaseListenerInterface> void register(final T listener) { + this.eventBusChangeRecorder = new EventBusChangeRecorder(listener); + this.eventBus.register(this.eventBusChangeRecorder); + } + + /** + * Post event to event bus. + * + * @param event + * data of incoming notifications + */ + protected void post(final Event event) { + this.eventBus.post(event); + } + + /** + * Removes all subscribers and unregisters event bus change recorder form + * event bus. + */ + protected void unregister() { + this.subscribers.clear(); + this.eventBus.unregister(this.eventBusChangeRecorder); + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java new file mode 100644 index 0000000..7e7bc1a --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import org.opendaylight.yangtools.util.xml.UntrustedXML; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; +import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Abstract class for processing and preparing data. + * + */ +abstract class AbstractNotificationsData { + private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class); + private static final TransformerFactory TF = TransformerFactory.newInstance(); + private static final XMLOutputFactory OF = XMLOutputFactory.newFactory(); + + /** + * Formats data specified by RFC3339. + * + * @param now time stamp + * @return Data specified by RFC3339. + */ + protected static String toRFC3339(final Instant now) { + return DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(OffsetDateTime.ofInstant(now, ZoneId.systemDefault())); + } + + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ + protected static Document createDocument() { + return UntrustedXML.newDocumentBuilder().newDocument(); + } + + /** + * Write normalized node to {@link DOMResult}. + * + * @param normalized + * data + * @param inference + * SchemaInferenceStack state for the data + * @return {@link DOMResult} + */ + protected DOMResult writeNormalizedNode(final NormalizedNode normalized, final Inference inference) + throws IOException, XMLStreamException { + final Document doc = UntrustedXML.newDocumentBuilder().newDocument(); + final DOMResult result = new DOMResult(doc); + NormalizedNodeWriter normalizedNodeWriter = null; + NormalizedNodeStreamWriter normalizedNodeStreamWriter = null; + XMLStreamWriter writer = null; + + try { + writer = OF.createXMLStreamWriter(result); + normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, inference); + normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter); + + normalizedNodeWriter.write(normalized); + + normalizedNodeWriter.flush(); + } finally { + if (normalizedNodeWriter != null) { + normalizedNodeWriter.close(); + } + if (normalizedNodeStreamWriter != null) { + normalizedNodeStreamWriter.close(); + } + if (writer != null) { + writer.close(); + } + } + + return result; + } + + /** + * Generating base element of every notification. + * + * @param doc + * base {@link Document} + * @return element of {@link Document} + */ + protected Element basePartDoc(final Document doc) { + final Element notificationElement = + doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification"); + + doc.appendChild(notificationElement); + + final Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(toRFC3339(Instant.now())); + notificationElement.appendChild(eventTimeElement); + + return notificationElement; + } + + /** + * Generating of {@link Document} transforming to string. + * + * @param doc + * {@link Document} with data + * @return - string from {@link Document} + */ + protected String transformDoc(final Document doc) { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try { + final Transformer transformer = TF.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult(out)); + } catch (final TransformerException e) { + // FIXME: this should raise an exception + final String msg = "Error during transformation of Document into String"; + LOG.error(msg, e); + return msg; + } + + return new String(out.toByteArray(), StandardCharsets.UTF_8); + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java new file mode 100644 index 0000000..4697646 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import java.io.StringReader; +import java.time.Instant; +import java.util.Optional; +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; + +/** + * Features of query parameters part of both notifications. + * + */ +abstract class AbstractQueryParams extends AbstractNotificationsData { + // FIXME: BUG-7956: switch to using UntrustedXML + private static final DocumentBuilderFactory DBF; + + static { + final DocumentBuilderFactory f = DocumentBuilderFactory.newInstance(); + f.setCoalescing(true); + f.setExpandEntityReferences(false); + f.setIgnoringElementContentWhitespace(true); + f.setIgnoringComments(true); + f.setXIncludeAware(false); + try { + f.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + f.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + f.setFeature("http://xml.org/sax/features/external-general-entities", false); + f.setFeature("http://xml.org/sax/features/external-parameter-entities", false); + } catch (final ParserConfigurationException e) { + throw new ExceptionInInitializerError(e); + } + DBF = f; + } + + // FIXME: these should be final + private Instant start = null; + private Instant stop = null; + private String filter = null; + private boolean leafNodesOnly = false; + private boolean skipNotificationData = false; + + @VisibleForTesting + public final Instant getStart() { + return start; + } + + /** + * Set query parameters for listener. + * + * @param start + * start-time of getting notification + * @param stop + * stop-time of getting notification + * @param filter + * indicate which subset of all possible events are of interest + * @param leafNodesOnly + * if true, notifications will contain changes to leaf nodes only + * @param skipNotificationData + * if true, notification will not contain changed data + */ + @SuppressWarnings("checkstyle:hiddenField") + public void setQueryParams(final Instant start, final Optional<Instant> stop, final Optional<String> filter, + final boolean leafNodesOnly, final boolean skipNotificationData) { + this.start = requireNonNull(start); + this.stop = stop.orElse(null); + this.filter = filter.orElse(null); + this.leafNodesOnly = leafNodesOnly; + this.skipNotificationData = skipNotificationData; + } + + /** + * Check whether this query should only notify about leaf node changes. + * + * @return true if this query should only notify about leaf node changes + */ + public boolean getLeafNodesOnly() { + return leafNodesOnly; + } + + /** + * Check whether this query should notify changes without data. + * + * @return true if this query should notify about changes with data + */ + public boolean isSkipNotificationData() { + return skipNotificationData; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + <T extends BaseListenerInterface> boolean checkStartStop(final Instant now, final T listener) { + if (this.stop != null) { + if (this.start.compareTo(now) < 0 && this.stop.compareTo(now) > 0) { + return true; + } + if (this.stop.compareTo(now) < 0) { + try { + listener.close(); + } catch (final Exception e) { + throw new RestconfDocumentedException("Problem with unregister listener." + e); + } + } + } else if (this.start != null) { + if (this.start.compareTo(now) < 0) { + this.start = null; + return true; + } + } else { + return true; + } + return false; + } + + /** + * Check if is filter used and then prepare and post data do client. + * + * @param xml data of notification + */ + @SuppressWarnings("checkstyle:IllegalCatch") + boolean checkFilter(final String xml) { + if (this.filter == null) { + return true; + } + + try { + return parseFilterParam(xml); + } catch (final Exception e) { + throw new RestconfDocumentedException("Problem while parsing filter.", e); + } + } + + /** + * Parse and evaluate filter value by xml. + * + * @return true or false - depends on filter expression and data of + * notifiaction + * @throws Exception if operation fails + */ + private boolean parseFilterParam(final String xml) throws Exception { + final Document docOfXml = DBF.newDocumentBuilder().parse(new InputSource(new StringReader(xml))); + final XPath xPath = XPathFactory.newInstance().newXPath(); + // FIXME: BUG-7956: xPath.setNamespaceContext(nsContext); + return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN); + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java new file mode 100644 index 0000000..4804e16 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import io.netty.channel.Channel; +import java.util.Set; + +/** + * Base interface for both listeners({@link ListenerAdapter}, + * {@link NotificationListenerAdapter}). + */ +interface BaseListenerInterface extends AutoCloseable { + + /** + * Return all subscribers of listener. + * + * @return set of subscribers + */ + Set<Channel> getSubscribers(); + + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false + * otherwise. + */ + boolean hasSubscribers(); + + /** + * Get name of stream. + * + * @return stream name + */ + String getStreamName(); + + /** + * Get output type. + * + * @return outputType + */ + String getOutputType(); +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java new file mode 100644 index 0000000..486d807 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import io.netty.channel.Channel; + +/** + * Represents event of specific {@link EventType} type, holds data and + * {@link Channel} subscriber. + */ +class Event { + private final EventType type; + private Channel subscriber; + private String data; + + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + Event(final EventType type) { + this.type = type; + } + + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ + public Channel getSubscriber() { + return this.subscriber; + } + + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(final Channel subscriber) { + this.subscriber = subscriber; + } + + /** + * Gets event String. + * + * @return String representation of event data. + */ + public String getData() { + return this.data; + } + + /** + * Sets event data. + * + * @param data + * String. + */ + public void setData(final String data) { + this.data = data; + } + + /** + * Gets event type. + * + * @return The type of the event. + */ + public EventType getType() { + return this.type; + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java new file mode 100644 index 0000000..11e5656 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import com.google.common.eventbus.Subscribe; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class EventBusChangeRecorder<T extends BaseListenerInterface> { + + private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class); + private final T listener; + + /** + * Event bus change recorder of specific listener of notifications. + * + * @param listener + * specific listener + */ + EventBusChangeRecorder(final T listener) { + this.listener = listener; + } + + @Subscribe + public void recordCustomerChange(final Event event) { + if (event.getType() == EventType.REGISTER) { + final Channel subscriber = event.getSubscriber(); + if (!this.listener.getSubscribers().contains(subscriber)) { + this.listener.getSubscribers().add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + this.listener.getSubscribers().remove(event.getSubscriber()); + Notificator.removeListenerIfNoSubscriberExists(this.listener); + } else if (event.getType() == EventType.NOTIFY) { + for (final Channel subscriber : this.listener.getSubscribers()) { + if (subscriber.isActive()) { + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + } else { + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + this.listener.getSubscribers().remove(subscriber); + } + } + } + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java new file mode 100644 index 0000000..ba7c2a3 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +/** + * Type of the event. + */ +enum EventType { + REGISTER, DEREGISTER, NOTIFY +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java new file mode 100644 index 0000000..fc85862 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java @@ -0,0 +1,425 @@ +/* + * Copyright (c) 2014, 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.Optional; +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.dom.DOMResult; +import org.json.XML; +import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; +import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +/** + * {@link ListenerAdapter} is responsible to track events, which occurred by + * changing data in data source. + */ +public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); + private static final String DATA_CHANGE_EVENT = "data-change-event"; + private static final String PATH = "path"; + private static final String OPERATION = "operation"; + + private final ControllerContext controllerContext; + private final YangInstanceIdentifier path; + private final String streamName; + private final NotificationOutputType outputType; + + /** + * Creates new {@link ListenerAdapter} listener specified by path and stream + * name and register for subscribing. + * + * @param path + * Path to data in data store. + * @param streamName + * The name of the stream. + * @param outputType + * Type of output on notification (JSON, XML) + */ + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "non-final for testing") + ListenerAdapter(final YangInstanceIdentifier path, final String streamName, + final NotificationOutputType outputType, final ControllerContext controllerContext) { + this.outputType = requireNonNull(outputType); + this.path = requireNonNull(path); + checkArgument(streamName != null && !streamName.isEmpty()); + this.streamName = streamName; + this.controllerContext = controllerContext; + register(this); + } + + @Override + public void onInitialData() { + // No-op + } + + @Override + public void onDataTreeChanged(final List<DataTreeCandidate> dataTreeCandidates) { + final Instant now = Instant.now(); + if (!checkStartStop(now, this)) { + return; + } + + final String xml = prepareXml(dataTreeCandidates); + if (checkFilter(xml)) { + prepareAndPostData(xml); + } + } + + /** + * Gets the name of the stream. + * + * @return The name of the stream. + */ + @Override + public String getStreamName() { + return streamName; + } + + @Override + public String getOutputType() { + return outputType.getName(); + } + + /** + * Get path pointed to data in data store. + * + * @return Path pointed to data in data store. + */ + public YangInstanceIdentifier getPath() { + return path; + } + + /** + * Prepare data of notification and data to client. + * + * @param xml data + */ + private void prepareAndPostData(final String xml) { + final Event event = new Event(EventType.NOTIFY); + if (outputType.equals(NotificationOutputType.JSON)) { + event.setData(XML.toJSONObject(xml).toString()); + } else { + event.setData(xml); + } + post(event); + } + + /** + * Tracks events of data change by customer. + */ + + /** + * Prepare data in printable form and transform it to String. + * + * @return Data in printable form. + */ + private String prepareXml(final Collection<DataTreeCandidate> candidates) { + final EffectiveModelContext schemaContext = controllerContext.getGlobalSchema(); + final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext); + final Document doc = createDocument(); + final Element notificationElement = basePartDoc(doc); + + final Element dataChangedNotificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); + + addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, candidates, + schemaContext, dataContextTree); + notificationElement.appendChild(dataChangedNotificationEventElement); + return transformDoc(doc); + } + + /** + * Adds values to data changed notification event element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param dataTreeCandidates + * {@link DataTreeCandidate} + */ + private void addValuesToDataChangedNotificationEventElement(final Document doc, + final Element dataChangedNotificationEventElement, + final Collection<DataTreeCandidate> dataTreeCandidates, + final EffectiveModelContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + + for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) { + DataTreeCandidateNode candidateNode = dataTreeCandidate.getRootNode(); + if (candidateNode == null) { + continue; + } + YangInstanceIdentifier yiid = dataTreeCandidate.getRootPath(); + + boolean isSkipNotificationData = this.isSkipNotificationData(); + if (isSkipNotificationData) { + createCreatedChangedDataChangeEventElementWithoutData(doc, + dataChangedNotificationEventElement, dataTreeCandidate.getRootNode()); + } else { + addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, candidateNode, + yiid.getParent(), schemaContext, dataSchemaContextTree); + } + } + } + + private void addNodeToDataChangeNotificationEventElement(final Document doc, + final Element dataChangedNotificationEventElement, final DataTreeCandidateNode candidateNode, + final YangInstanceIdentifier parentYiid, final EffectiveModelContext schemaContext, + final DataSchemaContextTree dataSchemaContextTree) { + + Optional<NormalizedNode> optionalNormalizedNode = Optional.empty(); + switch (candidateNode.getModificationType()) { + case APPEARED: + case SUBTREE_MODIFIED: + case WRITE: + optionalNormalizedNode = candidateNode.getDataAfter(); + break; + case DELETE: + case DISAPPEARED: + optionalNormalizedNode = candidateNode.getDataBefore(); + break; + case UNMODIFIED: + default: + break; + } + + if (optionalNormalizedNode.isEmpty()) { + LOG.error("No node present in notification for {}", candidateNode); + return; + } + + NormalizedNode normalizedNode = optionalNormalizedNode.get(); + YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid) + .append(normalizedNode.getIdentifier()).build(); + + boolean isNodeMixin = controllerContext.isNodeMixin(yiid); + boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode); + if (!isNodeMixin && !isSkippedNonLeaf) { + Node node = null; + switch (candidateNode.getModificationType()) { + case APPEARED: + case SUBTREE_MODIFIED: + case WRITE: + Operation op = candidateNode.getDataBefore().isPresent() ? Operation.UPDATED : Operation.CREATED; + node = createCreatedChangedDataChangeEventElement(doc, yiid, normalizedNode, op, + schemaContext, dataSchemaContextTree); + break; + case DELETE: + case DISAPPEARED: + node = createDataChangeEventElement(doc, yiid, Operation.DELETED); + break; + case UNMODIFIED: + default: + break; + } + if (node != null) { + dataChangedNotificationEventElement.appendChild(node); + } + } + + for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) { + addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, childNode, + yiid, schemaContext, dataSchemaContextTree); + } + } + + /** + * Creates changed event element from data. + * + * @param doc + * {@link Document} + * @param dataPath + * Path to data in data store. + * @param operation + * {@link Operation} + * @return {@link Node} node represented by changed event element. + */ + private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier dataPath, + final Operation operation) { + final Element dataChangeEventElement = doc.createElement(DATA_CHANGE_EVENT); + final Element pathElement = doc.createElement(PATH); + addPathAsValueToElement(dataPath, pathElement); + dataChangeEventElement.appendChild(pathElement); + + final Element operationElement = doc.createElement(OPERATION); + operationElement.setTextContent(operation.value); + dataChangeEventElement.appendChild(operationElement); + + return dataChangeEventElement; + } + + /** + * Creates data change notification element without data element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param candidateNode + * {@link DataTreeCandidateNode} + */ + private void createCreatedChangedDataChangeEventElementWithoutData(final Document doc, + final Element dataChangedNotificationEventElement, final DataTreeCandidateNode candidateNode) { + final Operation operation; + switch (candidateNode.getModificationType()) { + case APPEARED: + case SUBTREE_MODIFIED: + case WRITE: + operation = candidateNode.getDataBefore().isPresent() ? Operation.UPDATED : Operation.CREATED; + break; + case DELETE: + case DISAPPEARED: + operation = Operation.DELETED; + break; + case UNMODIFIED: + default: + return; + } + Node dataChangeEventElement = createDataChangeEventElement(doc, getPath(), operation); + dataChangedNotificationEventElement.appendChild(dataChangeEventElement); + + } + + private Node createCreatedChangedDataChangeEventElement(final Document doc, + final YangInstanceIdentifier eventPath, final NormalizedNode normalized, final Operation operation, + final EffectiveModelContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + final Element dataChangeEventElement = doc.createElement(DATA_CHANGE_EVENT); + final Element pathElement = doc.createElement(PATH); + addPathAsValueToElement(eventPath, pathElement); + dataChangeEventElement.appendChild(pathElement); + + final Element operationElement = doc.createElement(OPERATION); + operationElement.setTextContent(operation.value); + dataChangeEventElement.appendChild(operationElement); + + final SchemaInferenceStack stack = dataSchemaContextTree.enterPath(eventPath).orElseThrow().stack(); + if (!(normalized instanceof MapEntryNode) && !(normalized instanceof UnkeyedListEntryNode) + && !stack.isEmpty()) { + stack.exit(); + } + + final var inference = stack.toInference(); + + try { + final DOMResult domResult = writeNormalizedNode(normalized, inference); + final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); + final Element dataElement = doc.createElement("data"); + dataElement.appendChild(result); + dataChangeEventElement.appendChild(dataElement); + } catch (final IOException e) { + LOG.error("Error in writer ", e); + } catch (final XMLStreamException e) { + LOG.error("Error processing stream", e); + } + + return dataChangeEventElement; + } + + /** + * Adds path as value to element. + * + * @param dataPath + * Path to data in data store. + * @param element + * {@link Element} + */ + @SuppressWarnings("rawtypes") + private void addPathAsValueToElement(final YangInstanceIdentifier dataPath, final Element element) { + final YangInstanceIdentifier normalizedPath = controllerContext.toXpathRepresentation(dataPath); + final StringBuilder textContent = new StringBuilder(); + + for (final PathArgument pathArgument : normalizedPath.getPathArguments()) { + if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) { + continue; + } + textContent.append("/"); + writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType()); + if (pathArgument instanceof NodeIdentifierWithPredicates) { + for (final Entry<QName, Object> entry : ((NodeIdentifierWithPredicates) pathArgument).entrySet()) { + final QName keyValue = entry.getKey(); + final String predicateValue = String.valueOf(entry.getValue()); + textContent.append("["); + writeIdentifierWithNamespacePrefix(element, textContent, keyValue); + textContent.append("='"); + textContent.append(predicateValue); + textContent.append("'"); + textContent.append("]"); + } + } else if (pathArgument instanceof NodeWithValue) { + textContent.append("[.='"); + textContent.append(((NodeWithValue) pathArgument).getValue()); + textContent.append("'"); + textContent.append("]"); + } + } + element.setTextContent(textContent.toString()); + } + + /** + * Writes identifier that consists of prefix and QName. + * + * @param element + * {@link Element} + * @param textContent + * StringBuilder + * @param qualifiedName + * QName + */ + private void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent, + final QName qualifiedName) { + final Module module = controllerContext.getGlobalSchema().findModule(qualifiedName.getModule()) + .get(); + + textContent.append(module.getName()); + textContent.append(":"); + textContent.append(qualifiedName.getLocalName()); + } + + /** + * Consists of three types {@link Operation#CREATED}, + * {@link Operation#UPDATED} and {@link Operation#DELETED}. + */ + private enum Operation { + CREATED("created"), UPDATED("updated"), DELETED("deleted"); + + private final String value; + + Operation(final String value) { + this.value = value; + } + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java new file mode 100644 index 0000000..3061285 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.time.Instant; +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.dom.DOMResult; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +/** + * {@link NotificationListenerAdapter} is responsible to track events on notifications. + */ +public final class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener { + private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class); + + private final ControllerContext controllerContext; + private final String streamName; + private final Absolute path; + private final String outputType; + + /** + * Set path of listener and stream name, register event bus. + * + * @param path + * path of notification + * @param streamName + * stream name of listener + * @param outputType + * type of output on notification (JSON, XML) + */ + NotificationListenerAdapter(final Absolute path, final String streamName, final String outputType, + final ControllerContext controllerContext) { + register(this); + this.outputType = requireNonNull(outputType); + this.path = requireNonNull(path); + checkArgument(streamName != null && !streamName.isEmpty()); + this.streamName = streamName; + this.controllerContext = controllerContext; + } + + /** + * Get outputType of listener. + * + * @return the outputType + */ + @Override + public String getOutputType() { + return outputType; + } + + @Override + public void onNotification(final DOMNotification notification) { + final Instant now = Instant.now(); + if (!checkStartStop(now, this)) { + return; + } + + final EffectiveModelContext schemaContext = controllerContext.getGlobalSchema(); + final String xml = prepareXml(schemaContext, notification); + if (checkFilter(xml)) { + prepareAndPostData(outputType.equals("JSON") ? prepareJson(schemaContext, notification) : xml); + } + } + + /** + * Get stream name of this listener. + * + * @return {@link String} + */ + @Override + public String getStreamName() { + return streamName; + } + + /** + * Get schema path of notification. + * + * @return {@link Absolute} SchemaNodeIdentifier + */ + public Absolute getSchemaPath() { + return path; + } + + /** + * Prepare data of notification and data to client. + * + * @param data data + */ + private void prepareAndPostData(final String data) { + final Event event = new Event(EventType.NOTIFY); + event.setData(data); + post(event); + } + + /** + * Prepare json from notification data. + * + * @return json as {@link String} + */ + @VisibleForTesting + String prepareJson(final EffectiveModelContext schemaContext, final DOMNotification notification) { + final JsonObject json = new JsonObject(); + json.add("ietf-restconf:notification", JsonParser.parseString(writeBodyToString(schemaContext, notification))); + json.addProperty("event-time", ListenerAdapter.toRFC3339(Instant.now())); + return json.toString(); + } + + private static String writeBodyToString(final EffectiveModelContext schemaContext, + final DOMNotification notification) { + final Writer writer = new StringWriter(); + final NormalizedNodeStreamWriter jsonStream = JSONNormalizedNodeStreamWriter.createExclusiveWriter( + JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext), + notification.getType(), null, JsonWriterFactory.createJsonWriter(writer)); + final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream); + try { + nodeWriter.write(notification.getBody()); + nodeWriter.close(); + } catch (final IOException e) { + throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e); + } + return writer.toString(); + } + + private String prepareXml(final EffectiveModelContext schemaContext, final DOMNotification notification) { + final Document doc = createDocument(); + final Element notificationElement = basePartDoc(doc); + + final Element notificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream"); + addValuesToNotificationEventElement(doc, notificationEventElement, schemaContext, notification); + notificationElement.appendChild(notificationEventElement); + + return transformDoc(doc); + } + + private void addValuesToNotificationEventElement(final Document doc, final Element element, + final EffectiveModelContext schemaContext, final DOMNotification notification) { + try { + final DOMResult domResult = writeNormalizedNode(notification.getBody(), + SchemaInferenceStack.of(schemaContext, path).toInference()); + final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); + final Element dataElement = doc.createElement("notification"); + dataElement.appendChild(result); + element.appendChild(dataElement); + } catch (final IOException e) { + LOG.error("Error in writer ", e); + } catch (final XMLStreamException e) { + LOG.error("Error processing stream", e); + } + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java new file mode 100644 index 0000000..5bfa79f --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Notificator} is responsible to create, remove and find + * {@link ListenerAdapter} listener. + */ +public final class Notificator { + + private static Map<String, ListenerAdapter> dataChangeListener = new ConcurrentHashMap<>(); + private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName = + new ConcurrentHashMap<>(); + + private static final Logger LOG = LoggerFactory.getLogger(Notificator.class); + private static final Lock LOCK = new ReentrantLock(); + + private Notificator() { + } + + /** + * Returns list of all stream names. + */ + public static Set<String> getStreamNames() { + return dataChangeListener.keySet(); + } + + /** + * Gets {@link ListenerAdapter} specified by stream name. + * + * @param streamName + * The name of the stream. + * @return {@link ListenerAdapter} specified by stream name. + */ + public static ListenerAdapter getListenerFor(final String streamName) { + return dataChangeListener.get(streamName); + } + + /** + * Checks if the listener specified by {@link YangInstanceIdentifier} path exist. + * + * @param streamName name of the stream + * @return True if the listener exist, false otherwise. + */ + public static boolean existListenerFor(final String streamName) { + return dataChangeListener.containsKey(streamName); + } + + /** + * Creates new {@link ListenerAdapter} listener from + * {@link YangInstanceIdentifier} path and stream name. + * + * @param path + * Path to data in data repository. + * @param streamName + * The name of the stream. + * @param outputType + * Spcific type of output for notifications - XML or JSON + * @return New {@link ListenerAdapter} listener from + * {@link YangInstanceIdentifier} path and stream name. + */ + public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName, + final NotificationOutputType outputType, final ControllerContext controllerContext) { + final ListenerAdapter listener = new ListenerAdapter(path, streamName, outputType, controllerContext); + try { + LOCK.lock(); + dataChangeListener.put(streamName, listener); + } finally { + LOCK.unlock(); + } + return listener; + } + + /** + * Looks for listener determined by {@link YangInstanceIdentifier} path and removes it. + * Creates String representation of stream name from URI. Removes slash from URI in start and end position. + * + * @param uri + * URI for creation stream name. + * @return String representation of stream name. + */ + public static String createStreamNameFromUri(final String uri) { + if (uri == null) { + return null; + } + String result = uri; + if (result.startsWith("/")) { + result = result.substring(1); + } + if (result.endsWith("/")) { + result = result.substring(0, result.length() - 1); + } + return result; + } + + /** + * Removes all listeners. + */ + @SuppressWarnings("checkstyle:IllegalCatch") + public static void removeAllListeners() { + for (final ListenerAdapter listener : dataChangeListener.values()) { + try { + listener.close(); + } catch (final Exception e) { + LOG.error("Failed to close listener", e); + } + } + try { + LOCK.lock(); + dataChangeListener = new ConcurrentHashMap<>(); + } finally { + LOCK.unlock(); + } + } + + /** + * Delete {@link ListenerAdapter} listener specified in parameter. + * + * @param <T> + * + * @param listener + * ListenerAdapter + */ + @SuppressWarnings("checkstyle:IllegalCatch") + private static <T extends BaseListenerInterface> void deleteListener(final T listener) { + if (listener != null) { + try { + listener.close(); + } catch (final Exception e) { + LOG.error("Failed to close listener", e); + } + try { + LOCK.lock(); + dataChangeListener.remove(listener.getStreamName()); + } finally { + LOCK.unlock(); + } + } + } + + /** + * Check if the listener specified by qnames of request exist. + * + * @param streamName + * name of stream + * @return True if the listener exist, false otherwise. + */ + public static boolean existNotificationListenerFor(final String streamName) { + return notificationListenersByStreamName.containsKey(streamName); + } + + /** + * Prepare listener for notification ({@link NotificationDefinition}). + * + * @param paths + * paths of notifications + * @param streamName + * name of stream (generated by paths) + * @param outputType + * type of output for onNotification - XML or JSON + * @return List of {@link NotificationListenerAdapter} by paths + */ + public static List<NotificationListenerAdapter> createNotificationListener(final List<Absolute> paths, + final String streamName, final String outputType, final ControllerContext controllerContext) { + final List<NotificationListenerAdapter> listListeners = new ArrayList<>(); + for (final Absolute path : paths) { + final NotificationListenerAdapter listener = + new NotificationListenerAdapter(path, streamName, outputType, controllerContext); + listListeners.add(listener); + } + try { + LOCK.lock(); + notificationListenersByStreamName.put(streamName, listListeners); + } finally { + LOCK.unlock(); + } + return listListeners; + } + + public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) { + if (!listener.hasSubscribers()) { + if (listener instanceof NotificationListenerAdapter) { + deleteNotificationListener(listener); + } else { + deleteListener(listener); + } + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) { + if (listener != null) { + try { + listener.close(); + } catch (final Exception e) { + LOG.error("Failed to close listener", e); + } + try { + LOCK.lock(); + notificationListenersByStreamName.remove(listener.getStreamName()); + } finally { + LOCK.unlock(); + } + } + } + + public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) { + return notificationListenersByStreamName.get(streamName); + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java new file mode 100644 index 0000000..a295c54 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServer.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.websockets; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.opendaylight.netconf.sal.streams.listeners.Notificator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link WebSocketServer} is the singleton responsible for starting and stopping the + * web socket server. + */ +public final class WebSocketServer implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); + + private static final String DEFAULT_ADDRESS = "0.0.0.0"; + + private static WebSocketServer instance = null; + + private final String address; + private final int port; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + + private WebSocketServer(final String address, final int port) { + this.address = address; + this.port = port; + } + + /** + * Create singleton instance of {@link WebSocketServer}. + * + * @param port TCP port used for this server + * @return instance of {@link WebSocketServer} + */ + private static WebSocketServer createInstance(final int port) { + instance = createInstance(DEFAULT_ADDRESS, port); + return instance; + } + + public static WebSocketServer createInstance(final String address, final int port) { + checkState(instance == null, "createInstance() has already been called"); + checkArgument(port >= 1024, "Privileged port (below 1024) is not allowed"); + + instance = new WebSocketServer(requireNonNull(address, "Address cannot be null."), port); + LOG.info("Created WebSocketServer on {}:{}", address, port); + return instance; + } + + /** + * Get the websocket of TCP port. + * + * @return websocket TCP port + */ + public int getPort() { + return port; + } + + /** + * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}. + * + * @return instance of {@link WebSocketServer} + */ + public static WebSocketServer getInstance() { + return requireNonNull(instance, "createInstance() must be called prior to getInstance()"); + } + + /** + * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}. + * If an instance doesnt exist create one with the provided fallback port. + * + * @return instance of {@link WebSocketServer} + */ + public static WebSocketServer getInstance(final int fallbackPort) { + if (instance != null) { + return instance; + } + + LOG.warn("No instance for WebSocketServer found, creating one with a fallback port: {}", fallbackPort); + return createInstance(fallbackPort); + } + + /** + * Destroy the existing instance. + */ + public static void destroyInstance() { + checkState(instance != null, "createInstance() must be called prior to destroyInstance()"); + + instance.stop(); + instance = null; + LOG.info("Destroyed WebSocketServer."); + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public void run() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + final ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new WebSocketServerInitializer()); + + final Channel channel = serverBootstrap.bind(address, port).sync().channel(); + LOG.info("Web socket server started at address {}, port {}.", address, port); + + channel.closeFuture().sync(); + } catch (final InterruptedException e) { + LOG.error("Web socket server encountered an error during startup attempt on port {}", port, e); + } catch (Throwable throwable) { + // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them + LOG.error("Error while binding to address {}, port {}", address, port, throwable); + throw throwable; + } finally { + stop(); + } + } + + /** + * Stops the web socket server and removes all listeners. + */ + private void stop() { + LOG.info("Stopping the web socket server instance on port {}", port); + Notificator.removeAllListeners(); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + bossGroup = null; + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + workerGroup = null; + } + } + +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java new file mode 100644 index 0000000..ed90e3f --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.websockets; + +import static io.netty.handler.codec.http.HttpHeaderNames.HOST; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; +import static io.netty.handler.codec.http.HttpUtil.setContentLength; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import io.netty.util.CharsetUtil; +import java.util.List; +import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl; +import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.Notificator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle + * {@link FullHttpRequest} and {@link WebSocketFrame} messages. + */ +public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { + private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class); + + private WebSocketServerHandshaker handshaker; + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + /** + * Checks if HTTP request method is GET and if is possible to decode HTTP result of request. + * + * @param ctx ChannelHandlerContext + * @param req FullHttpRequest + */ + private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) { + // Handle a bad request. + if (!req.decoderResult().isSuccess()) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); + return; + } + + // Allow only GET methods. + if (req.method() != GET) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); + return; + } + + final String streamName = Notificator.createStreamNameFromUri(req.uri()); + if (streamName.contains(RestconfImpl.DATA_SUBSCR)) { + final ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.addSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully registered."); + } else { + LOG.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } + } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) { + final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName); + if (listeners != null && !listeners.isEmpty()) { + for (final NotificationListenerAdapter listener : listeners) { + listener.addSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully registered."); + } + } else { + LOG.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } + } + + // Handshake + final WebSocketServerHandshakerFactory wsFactory = + new WebSocketServerHandshakerFactory(getWebSocketLocation(req), + null, false); + this.handshaker = wsFactory.newHandshaker(req); + if (this.handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); + } else { + this.handshaker.handshake(ctx.channel(), req); + } + } + + /** + * Checks response status, send response and close connection if necessary. + * + * @param ctx ChannelHandlerContext + * @param req HttpRequest + * @param res FullHttpResponse + */ + private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req, + final FullHttpResponse res) { + // Generate an error page if response getStatus code is not OK (200). + final boolean notOkay = !OK.equals(res.status()); + if (notOkay) { + res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8); + setContentLength(res, res.content().readableBytes()); + } + + // Send the response and close the connection if necessary. + final ChannelFuture f = ctx.channel().writeAndFlush(res); + if (notOkay || !isKeepAlive(req)) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + /** + * Handles web socket frame. + * + * @param ctx {@link ChannelHandlerContext} + * @param frame {@link WebSocketFrame} + */ + private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) { + if (frame instanceof CloseWebSocketFrame) { + this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); + if (streamName.contains(RestconfImpl.DATA_SUBSCR)) { + final ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.removeSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully registered."); + + Notificator.removeListenerIfNoSubscriberExists(listener); + } + } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) { + final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName); + if (listeners != null && !listeners.isEmpty()) { + for (final NotificationListenerAdapter listener : listeners) { + listener.removeSubscriber(ctx.channel()); + } + } + } + return; + } else if (frame instanceof PingWebSocketFrame) { + ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); + return; + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + ctx.close(); + } + + /** + * Get web socket location from HTTP request. + * + * @param req HTTP request from which the location will be returned + * @return String representation of web socket location. + */ + private static String getWebSocketLocation(final HttpRequest req) { + return "ws://" + req.headers().get(HOST) + req.uri(); + } +} diff --git a/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java new file mode 100644 index 0000000..365e982 --- /dev/null +++ b/netconf/restconf/restconf-nb-bierman02/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerInitializer.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.sal.streams.websockets; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +/** + * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of a {@link io.netty.channel.Channel} + * . + */ +public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { + + @Override + protected void initChannel(final SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("codec-http", new HttpServerCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + pipeline.addLast("handler", new WebSocketServerHandler()); + } + +} |