From f3969004c6ccac18e742c5fc48c844e315991023 Mon Sep 17 00:00:00 2001 From: Michael DÜrre Date: Thu, 8 Apr 2021 06:34:22 +0200 Subject: update websocketmanager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit update complete notification flow Issue-ID: CCSDK-3252 Signed-off-by: Michael DÜrre Change-Id: I87ba00f615707b942471fcace57bcda50ce37e61 --- .../impl/access/NetconfAccessorImpl.java | 8 + .../impl/access/NetconfCommunicatorManager.java | 17 +- .../access/binding/NetconfBindingAccessorImpl.java | 92 ++++++++++- .../binding/NetconfBindingNotificationsImpl.java | 148 ----------------- .../impl/access/dom/NetconfDomAccessorImpl.java | 184 +++++++++++++++++---- .../access/dom/NotificationServiceNotProvided.java | 44 +++++ .../impl/mdsal/MdsalApi.java | 47 ++++++ 7 files changed, 353 insertions(+), 187 deletions(-) delete mode 100644 sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java create mode 100644 sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java create mode 100644 sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java (limited to 'sdnr/wt/netconfnode-state-service/provider/src/main') diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java index 8eba4e7ef..a1a35401e 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java @@ -113,4 +113,12 @@ public class NetconfAccessorImpl implements NetconfAccessor { return netconfNodeStateService.getDataBroker(); } + /** + * check if nc-notifications.yang is supported by the device + */ + @Override + public boolean isNotificationsRFC5277Supported() { + return getCapabilites().isSupportingNamespace("urn:ietf:params:netconf:capability:notification:1.0"); + } + } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java index bff29acc1..30afb4a20 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java @@ -25,7 +25,7 @@ import java.util.Optional; import org.eclipse.jdt.annotation.NonNull; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingNotificationsImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingAccessorImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.NetconfDomAccessorImpl; import org.opendaylight.mdsal.binding.api.DataBroker; @@ -92,7 +92,8 @@ public class NetconfCommunicatorManager { LOG.info("Slave mountpoint {} without databroker", mountPointNodeName); } else { LOG.info("Master mountpoint {}", mountPointNodeName); - return Optional.of(new NetconfBindingNotificationsImpl(accessor, optionalNetconfNodeDatabroker.get(), mountPoint)); + return Optional.of( + new NetconfBindingAccessorImpl(accessor, optionalNetconfNodeDatabroker.get(), mountPoint)); } } return Optional.empty(); @@ -104,15 +105,17 @@ public class NetconfCommunicatorManager { .node(Topology.QNAME) .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id").intern(), "topology-netconf") .node(Node.QNAME) - .nodeWithKey(Node.QNAME, QName.create(Node.QNAME, "node-id").intern(), accessor.getNodeId().getValue()).build(); - final Optional mountPoint = domMountPointService.getMountPoint(mountpointPath); - if (mountPoint.isEmpty()) { + .nodeWithKey(Node.QNAME, QName.create(Node.QNAME, "node-id").intern(), accessor.getNodeId().getValue()) + .build(); + final Optional oMountPoint = domMountPointService.getMountPoint(mountpointPath); + if (oMountPoint.isEmpty()) { return Optional.empty(); } - final Optional domDataBroker = mountPoint.get().getService(DOMDataBroker.class); + final Optional domDataBroker = oMountPoint.get().getService(DOMDataBroker.class); if (domDataBroker.isPresent()) { - return Optional.of(new NetconfDomAccessorImpl(accessor, domDataBroker.get(), mountPoint.get(), domContext)); + return Optional + .of(new NetconfDomAccessorImpl(accessor, domDataBroker.get(), oMountPoint.get(), domContext)); } return Optional.empty(); } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java index 9b9e96c15..9c10f0bae 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java @@ -17,21 +17,41 @@ */ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.onap.ccsdk.features.sdnr.wt.common.YangHelper; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.TransactionUtils; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.MountPoint; import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionOutput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class NetconfBindingAccessorImpl extends NetconfAccessorImpl implements NetconfBindingAccessor { +public class NetconfBindingAccessorImpl extends NetconfAccessorImpl implements NetconfBindingAccessor { private static final Logger log = LoggerFactory.getLogger(NetconfBindingAccessorImpl.class); @@ -82,4 +102,74 @@ public abstract class NetconfBindingAccessorImpl extends NetconfAccessorImpl imp mountpoint.getIdentifier().toString(), optionalNotificationService, ranListenerRegistration); return ranListenerRegistration; } + + + @Override + public ListenableFuture> registerNotificationsStream( + @NonNull String streamName) { + String failMessage = ""; + final Optional optionalRpcConsumerService = + getMountpoint().getService(RpcConsumerRegistry.class); + if (optionalRpcConsumerService.isPresent()) { + final NotificationsService rpcService = + optionalRpcConsumerService.get().getRpcService(NotificationsService.class); + + final CreateSubscriptionInputBuilder createSubscriptionInputBuilder = new CreateSubscriptionInputBuilder(); + createSubscriptionInputBuilder.setStream(new StreamNameType(streamName)); + log.info("Event listener triggering notification stream {} for node {}", streamName, getNodeId()); + try { + CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build(); + if (createSubscriptionInput == null) { + failMessage = "createSubscriptionInput is null for mountpoint " + getNodeId(); + } else { + // Regular case, return value + return rpcService.createSubscription(createSubscriptionInput); + } + } catch (NullPointerException e) { + failMessage = "createSubscription failed"; + } + } else { + failMessage = "No RpcConsumerRegistry avaialble."; + } + //Be here only in case of problem and return failed indication + log.warn(failMessage); + RpcResultBuilder result = RpcResultBuilder.failed(); + result.withError(ErrorType.APPLICATION, failMessage); + SettableFuture> future = SettableFuture.create(); + future.set(result.build()); + return future; + } + + @Override + public void registerNotificationsStream(List streamList) { + for (Stream stream : streamList) { + @Nullable + StreamNameType streamName = stream.getName(); + if (streamName != null) { + String streamNameValue = stream.getName().getValue(); + log.info("Stream Name = {}, Stream Description = {}", streamNameValue, stream.getDescription()); + if (!(streamNameValue.equals(DefaultNotificationsStream))) + // Register any not default stream. Default stream is already registered + registerNotificationsStream(streamNameValue); + } else { + log.warn("Ignore a stream without name"); + } + } + } + + @Override + public List getNotificationStreams() { + final Class netconfClazz = Netconf.class; + InstanceIdentifier streamsIID = InstanceIdentifier.builder(netconfClazz).build(); + + Netconf res = getTransactionUtils().readData(getDataBroker(), LogicalDatastoreType.OPERATIONAL, streamsIID); + if (res != null) { + Streams streams = res.getStreams(); + if (streams != null) { + return YangHelper.getList(streams.nonnullStream()); + } + } + return Collections.emptyList(); + } + } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java deleted file mode 100644 index 46ff07b2c..000000000 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import org.eclipse.jdt.annotation.NonNull; -import org.eclipse.jdt.annotation.Nullable; -import org.onap.ccsdk.features.sdnr.wt.common.YangHelper; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNotifications; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; -import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.mdsal.binding.api.MountPoint; -import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionOutput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NetconfBindingNotificationsImpl extends NetconfBindingAccessorImpl implements NetconfNotifications { - - private static final Logger log = LoggerFactory.getLogger(NetconfAccessorImpl.class); - - public NetconfBindingNotificationsImpl(NetconfAccessorImpl accessor, DataBroker dataBroker, MountPoint mountpoint) { - super(accessor, dataBroker, mountpoint); - } - - @Override - public ListenableFuture> registerNotificationsStream( - @NonNull String streamName) { - String failMessage = ""; - final Optional optionalRpcConsumerService = - getMountpoint().getService(RpcConsumerRegistry.class); - if (optionalRpcConsumerService.isPresent()) { - final NotificationsService rpcService = - optionalRpcConsumerService.get().getRpcService(NotificationsService.class); - - final CreateSubscriptionInputBuilder createSubscriptionInputBuilder = new CreateSubscriptionInputBuilder(); - createSubscriptionInputBuilder.setStream(new StreamNameType(streamName)); - log.info("Event listener triggering notification stream {} for node {}", streamName, getNodeId()); - try { - CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build(); - if (createSubscriptionInput == null) { - failMessage = "createSubscriptionInput is null for mountpoint " + getNodeId(); - } else { - // Regular case, return value - return rpcService.createSubscription(createSubscriptionInput); - } - } catch (NullPointerException e) { - failMessage = "createSubscription failed"; - } - } else { - failMessage = "No RpcConsumerRegistry avaialble."; - } - //Be here only in case of problem and return failed indication - log.warn(failMessage); - RpcResultBuilder result = RpcResultBuilder.failed(); - result.withError(ErrorType.APPLICATION, failMessage); - SettableFuture> future = SettableFuture.create(); - future.set(result.build()); - return future; - } - - @Override - public void registerNotificationsStream(List streamList) { - for (Stream stream : streamList) { - @Nullable - StreamNameType streamName = stream.getName(); - if (streamName != null) { - String streamNameValue = stream.getName().getValue(); - log.info("Stream Name = {}, Stream Description = {}", streamNameValue, stream.getDescription()); - if (!(streamNameValue.equals(NetconfNotifications.DefaultNotificationsStream))) - // Register any not default stream. Default stream is already registered - registerNotificationsStream(streamNameValue); - } else { - log.warn("Ignore a stream without name"); - } - } - } - - @Override - public boolean isNotificationsSupported() { - return false; - } - - - /** - * check if nc-notifications.yang is supported by the device - */ - @Override - public boolean isNCNotificationsSupported() { - return getCapabilites().isSupportingNamespace(Netconf.QNAME); - } - - @Override - public List getNotificationStreams() { - final Class netconfClazz = Netconf.class; - InstanceIdentifier streamsIID = InstanceIdentifier.builder(netconfClazz).build(); - - Netconf res = getTransactionUtils().readData(getDataBroker(), LogicalDatastoreType.OPERATIONAL, streamsIID); - if (res != null) { - Streams streams = res.getStreams(); - if (streams != null) { - return YangHelper.getList(streams.nonnullStream()); - } - } - return Collections.emptyList(); - } - - @Override - public Optional getNotificationAccessor() { - return Optional.of(this); - } - -} diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java index 4eaec246e..caf3da1a7 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java @@ -21,9 +21,16 @@ */ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom; +import static java.util.stream.Collectors.toList; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.ListenableFuture; +import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; @@ -34,6 +41,7 @@ import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; @@ -42,10 +50,23 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; import org.opendaylight.mdsal.dom.api.DOMMountPoint; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -55,16 +76,29 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class); - private final DOMDataBroker dataBroker; - private final DOMMountPoint mountpoint; - private final DomContext domContext; + private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription"); + private static final SchemaPath RPC_PATH_CREATE_SUBSCRIPTION = + NetconfMessageTransformUtil.toPath(CREATE_SUBSCRIPTION); + private static final YangInstanceIdentifier STREAMS_PATH = + YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build(); - public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker, DOMMountPoint mountPoint, - DomContext domContext) { + protected final DOMDataBroker dataBroker; + protected final DOMMountPoint mountpoint; + protected final DomContext domContext; + private final DOMNotificationService notificationService; + private final BindingNormalizedNodeSerializer serializer; + private final DOMRpcService rpcService; + + + public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker, + DOMMountPoint mountpoint, DomContext domContext) { super(accessor); this.dataBroker = Objects.requireNonNull(domDataBroker); - this.mountpoint = Objects.requireNonNull(mountPoint); + this.mountpoint = Objects.requireNonNull(mountpoint); this.domContext = Objects.requireNonNull(domContext); + this.serializer = domContext.getBindingNormalizedNodeSerializer(); + this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class); + this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class); } @Override @@ -77,28 +111,6 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco return mountpoint; } - @Override - public @NonNull ListenerRegistration doRegisterNotificationListener( - @NonNull T listener, Collection types) { - LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString()); - final Optional optionalNotificationService = - mountpoint.getService(DOMNotificationService.class); - if (optionalNotificationService.isPresent()) { - final ListenerRegistration ranListenerRegistration = - optionalNotificationService.get().registerNotificationListener(listener, types); - LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}", - mountpoint.getIdentifier().toString(), optionalNotificationService, ranListenerRegistration); - return ranListenerRegistration; - } - throw new IllegalArgumentException("Can not get notification service"); - } - - @Override - public @NonNull ListenerRegistration doRegisterNotificationListener( - @NonNull T listener, SchemaPath[] types) { - return doRegisterNotificationListener(listener, Arrays.asList(types)); - } - @Override public Optional readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path, Class clazz) { @@ -121,11 +133,11 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco try (DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction()) { FluentFuture>> foData = readOnlyTransaction.read(dataStoreType, path); // RAVI - Add a few debug here, like what ? Speak to Micha.... - + Optional> data = foData.get(120, TimeUnit.SECONDS); LOG.info("read is done - {} ", foData.isDone()); return data; - + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.info("Incomplete read to node transaction {} {}", dataStoreType, path, e); return Optional.empty(); @@ -138,7 +150,8 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco throws CanNotConvertException { if (oData.isPresent()) { NormalizedNode data = oData.get(); - LOG.debug("data identifier: {}", data.getIdentifier()); + LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(), + data.getNodeType()); @Nullable Entry, DataObject> entry = serializer.fromNormalizedNode(path, data); if (entry != null) { @@ -158,4 +171,113 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco throw new CanNotConvertException("No data received for path:" + path); } } + + @Override + public @NonNull ListenerRegistration doRegisterNotificationListener( + @NonNull T listener, Collection types) { + LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString()); + + final ListenerRegistration ranListenerRegistration = + notificationService.registerNotificationListener(listener, types); + + LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}", + mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration); + + return ranListenerRegistration; + } + + @Override + public @NonNull ListenerRegistration doRegisterNotificationListener( + @NonNull T listener, SchemaPath[] types) { + return doRegisterNotificationListener(listener, Arrays.asList(types)); + } + + @Override + public @NonNull ListenerRegistration doRegisterNotificationListener( + @NonNull T listener, QName[] types) { + List schemaPathList = Arrays.stream(types).map(qname -> NetconfMessageTransformUtil.toPath(qname)).collect(toList()); + return doRegisterNotificationListener(listener, schemaPathList); + } + + + @Override + public ListenableFuture invokeCreateSubscription(CreateSubscriptionInput input) { + final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input); + return rpcService.invokeRpc(RPC_PATH_CREATE_SUBSCRIPTION, nnInput); + } + + @Override + public ListenableFuture invokeCreateSubscription(Optional oStream, + Optional filter, Optional startTime, Optional stopTime) { + + CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder(); + boolean replayIsSupported = false; + if (oStream.isPresent()) { + Stream stream = oStream.get(); + if (stream.getName() != null) { + inputBuilder.setStream(stream.getName()); + } + replayIsSupported = Boolean.TRUE.equals(stream.isReplaySupport()); + + } + if (filter.isPresent()) { + inputBuilder.setFilter(filter.get()); + } + if (startTime.isPresent()) { + if (replayIsSupported) { + inputBuilder.setStartTime(getDateAndTime(startTime.get())); + if (stopTime.isPresent()) { + if (startTime.get().isBefore(stopTime.get())) { + inputBuilder.setStopTime(getDateAndTime(stopTime.get())); + } else { + throw new IllegalArgumentException("stopTime must be later than startTime"); + } + } + } else { + throw new IllegalArgumentException("Replay not supported by this stream."); + } + } + return invokeCreateSubscription(inputBuilder.build()); + } + + @Override + public ListenableFuture invokeCreateSubscription(Stream... streams) { + ListenableFuture res; + if (streams.length == 0) { + return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + } else if (streams.length == 1) { + return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(), + Optional.empty()); + } else { + for (Stream stream : streams) { + res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(), + Optional.empty()); + try { + if (!res.get().getErrors().isEmpty()) { + return res; + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Exception during rpc call", e); + return res; + } + } + } + throw new IllegalStateException("Could never be reached"); //avoid eclipse error + } + + @Override + public @NonNull Map getNotificationStreamsAsMap() { + Optional oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class); + return oStreams.isPresent() ? oStreams.get().nonnullStream() : Collections.emptyMap(); + } + + @Override + public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() { + return serializer; + } + + private DateAndTime getDateAndTime(Instant dateTime) { + final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime); + return new DateAndTime(formattedDate); + } } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java new file mode 100644 index 000000000..c66a20c32 --- /dev/null +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom; + +public class NotificationServiceNotProvided extends Exception { + + private static final long serialVersionUID = 1L; + + public NotificationServiceNotProvided() { + super(); + } + + public NotificationServiceNotProvided(String message) { + super(message); + } + + public NotificationServiceNotProvided(String message, Throwable cause) { + super(message, cause); + } + + public NotificationServiceNotProvided(Throwable cause) { + super(cause); + } + +} diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java new file mode 100644 index 000000000..6f06641af --- /dev/null +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal; + +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMService; + +/** + * Collecting utilities for mdsal api + */ +public class MdsalApi { + /** + * Get mountpoint service and throw exception if not available + * @param + * @param mountPoint getting the service from + * @param service class to request + * @return service requested or throw + * @throws IllegalStateException + */ + public static T getMountpointService(final DOMMountPoint mountPoint, final Class service) { + final Optional optional = mountPoint.getService(service); + Preconditions.checkState(optional.isPresent(), "Service not present on mount point: %s", service.getName()); + return optional.get(); + } + +} -- cgit 1.2.3-korg