diff options
author | Michael DÜrre <michael.duerre@highstreet-technologies.com> | 2021-04-08 06:34:22 +0200 |
---|---|---|
committer | Michael DÜrre <michael.duerre@highstreet-technologies.com> | 2021-04-08 06:34:46 +0200 |
commit | f3969004c6ccac18e742c5fc48c844e315991023 (patch) | |
tree | f5486a62e842bb16ca7d3af47a8663df08feef55 /sdnr/wt/netconfnode-state-service/provider/src | |
parent | a252be83694ae33260d99d5371ed48c1558aa2e8 (diff) |
update websocketmanager
update complete notification flow
Issue-ID: CCSDK-3252
Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com>
Change-Id: I87ba00f615707b942471fcace57bcda50ce37e61
Diffstat (limited to 'sdnr/wt/netconfnode-state-service/provider/src')
8 files changed, 424 insertions, 194 deletions
diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java index 8eba4e7ef..a1a35401e 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfAccessorImpl.java @@ -113,4 +113,12 @@ public class NetconfAccessorImpl implements NetconfAccessor { return netconfNodeStateService.getDataBroker(); } + /** + * check if nc-notifications.yang is supported by the device + */ + @Override + public boolean isNotificationsRFC5277Supported() { + return getCapabilites().isSupportingNamespace("urn:ietf:params:netconf:capability:notification:1.0"); + } + } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java index bff29acc1..30afb4a20 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/NetconfCommunicatorManager.java @@ -25,7 +25,7 @@ import java.util.Optional; import org.eclipse.jdt.annotation.NonNull; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingNotificationsImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingAccessorImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.NetconfDomAccessorImpl; import org.opendaylight.mdsal.binding.api.DataBroker; @@ -92,7 +92,8 @@ public class NetconfCommunicatorManager { LOG.info("Slave mountpoint {} without databroker", mountPointNodeName); } else { LOG.info("Master mountpoint {}", mountPointNodeName); - return Optional.of(new NetconfBindingNotificationsImpl(accessor, optionalNetconfNodeDatabroker.get(), mountPoint)); + return Optional.of( + new NetconfBindingAccessorImpl(accessor, optionalNetconfNodeDatabroker.get(), mountPoint)); } } return Optional.empty(); @@ -104,15 +105,17 @@ public class NetconfCommunicatorManager { .node(Topology.QNAME) .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id").intern(), "topology-netconf") .node(Node.QNAME) - .nodeWithKey(Node.QNAME, QName.create(Node.QNAME, "node-id").intern(), accessor.getNodeId().getValue()).build(); - final Optional<DOMMountPoint> mountPoint = domMountPointService.getMountPoint(mountpointPath); - if (mountPoint.isEmpty()) { + .nodeWithKey(Node.QNAME, QName.create(Node.QNAME, "node-id").intern(), accessor.getNodeId().getValue()) + .build(); + final Optional<DOMMountPoint> oMountPoint = domMountPointService.getMountPoint(mountpointPath); + if (oMountPoint.isEmpty()) { return Optional.empty(); } - final Optional<DOMDataBroker> domDataBroker = mountPoint.get().getService(DOMDataBroker.class); + final Optional<DOMDataBroker> domDataBroker = oMountPoint.get().getService(DOMDataBroker.class); if (domDataBroker.isPresent()) { - return Optional.of(new NetconfDomAccessorImpl(accessor, domDataBroker.get(), mountPoint.get(), domContext)); + return Optional + .of(new NetconfDomAccessorImpl(accessor, domDataBroker.get(), oMountPoint.get(), domContext)); } return Optional.empty(); } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java index 9b9e96c15..9c10f0bae 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingAccessorImpl.java @@ -17,21 +17,41 @@ */ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.onap.ccsdk.features.sdnr.wt.common.YangHelper; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.TransactionUtils; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.MountPoint; import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionOutput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class NetconfBindingAccessorImpl extends NetconfAccessorImpl implements NetconfBindingAccessor { +public class NetconfBindingAccessorImpl extends NetconfAccessorImpl implements NetconfBindingAccessor { private static final Logger log = LoggerFactory.getLogger(NetconfBindingAccessorImpl.class); @@ -82,4 +102,74 @@ public abstract class NetconfBindingAccessorImpl extends NetconfAccessorImpl imp mountpoint.getIdentifier().toString(), optionalNotificationService, ranListenerRegistration); return ranListenerRegistration; } + + + @Override + public ListenableFuture<RpcResult<CreateSubscriptionOutput>> registerNotificationsStream( + @NonNull String streamName) { + String failMessage = ""; + final Optional<RpcConsumerRegistry> optionalRpcConsumerService = + getMountpoint().getService(RpcConsumerRegistry.class); + if (optionalRpcConsumerService.isPresent()) { + final NotificationsService rpcService = + optionalRpcConsumerService.get().getRpcService(NotificationsService.class); + + final CreateSubscriptionInputBuilder createSubscriptionInputBuilder = new CreateSubscriptionInputBuilder(); + createSubscriptionInputBuilder.setStream(new StreamNameType(streamName)); + log.info("Event listener triggering notification stream {} for node {}", streamName, getNodeId()); + try { + CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build(); + if (createSubscriptionInput == null) { + failMessage = "createSubscriptionInput is null for mountpoint " + getNodeId(); + } else { + // Regular case, return value + return rpcService.createSubscription(createSubscriptionInput); + } + } catch (NullPointerException e) { + failMessage = "createSubscription failed"; + } + } else { + failMessage = "No RpcConsumerRegistry avaialble."; + } + //Be here only in case of problem and return failed indication + log.warn(failMessage); + RpcResultBuilder<CreateSubscriptionOutput> result = RpcResultBuilder.failed(); + result.withError(ErrorType.APPLICATION, failMessage); + SettableFuture<RpcResult<CreateSubscriptionOutput>> future = SettableFuture.create(); + future.set(result.build()); + return future; + } + + @Override + public void registerNotificationsStream(List<Stream> streamList) { + for (Stream stream : streamList) { + @Nullable + StreamNameType streamName = stream.getName(); + if (streamName != null) { + String streamNameValue = stream.getName().getValue(); + log.info("Stream Name = {}, Stream Description = {}", streamNameValue, stream.getDescription()); + if (!(streamNameValue.equals(DefaultNotificationsStream))) + // Register any not default stream. Default stream is already registered + registerNotificationsStream(streamNameValue); + } else { + log.warn("Ignore a stream without name"); + } + } + } + + @Override + public List<Stream> getNotificationStreams() { + final Class<Netconf> netconfClazz = Netconf.class; + InstanceIdentifier<Netconf> streamsIID = InstanceIdentifier.builder(netconfClazz).build(); + + Netconf res = getTransactionUtils().readData(getDataBroker(), LogicalDatastoreType.OPERATIONAL, streamsIID); + if (res != null) { + Streams streams = res.getStreams(); + if (streams != null) { + return YangHelper.getList(streams.nonnullStream()); + } + } + return Collections.emptyList(); + } + } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java deleted file mode 100644 index 46ff07b2c..000000000 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/binding/NetconfBindingNotificationsImpl.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import org.eclipse.jdt.annotation.NonNull; -import org.eclipse.jdt.annotation.Nullable; -import org.onap.ccsdk.features.sdnr.wt.common.YangHelper; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNotifications; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; -import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.mdsal.binding.api.MountPoint; -import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionOutput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NetconfBindingNotificationsImpl extends NetconfBindingAccessorImpl implements NetconfNotifications { - - private static final Logger log = LoggerFactory.getLogger(NetconfAccessorImpl.class); - - public NetconfBindingNotificationsImpl(NetconfAccessorImpl accessor, DataBroker dataBroker, MountPoint mountpoint) { - super(accessor, dataBroker, mountpoint); - } - - @Override - public ListenableFuture<RpcResult<CreateSubscriptionOutput>> registerNotificationsStream( - @NonNull String streamName) { - String failMessage = ""; - final Optional<RpcConsumerRegistry> optionalRpcConsumerService = - getMountpoint().getService(RpcConsumerRegistry.class); - if (optionalRpcConsumerService.isPresent()) { - final NotificationsService rpcService = - optionalRpcConsumerService.get().getRpcService(NotificationsService.class); - - final CreateSubscriptionInputBuilder createSubscriptionInputBuilder = new CreateSubscriptionInputBuilder(); - createSubscriptionInputBuilder.setStream(new StreamNameType(streamName)); - log.info("Event listener triggering notification stream {} for node {}", streamName, getNodeId()); - try { - CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build(); - if (createSubscriptionInput == null) { - failMessage = "createSubscriptionInput is null for mountpoint " + getNodeId(); - } else { - // Regular case, return value - return rpcService.createSubscription(createSubscriptionInput); - } - } catch (NullPointerException e) { - failMessage = "createSubscription failed"; - } - } else { - failMessage = "No RpcConsumerRegistry avaialble."; - } - //Be here only in case of problem and return failed indication - log.warn(failMessage); - RpcResultBuilder<CreateSubscriptionOutput> result = RpcResultBuilder.failed(); - result.withError(ErrorType.APPLICATION, failMessage); - SettableFuture<RpcResult<CreateSubscriptionOutput>> future = SettableFuture.create(); - future.set(result.build()); - return future; - } - - @Override - public void registerNotificationsStream(List<Stream> streamList) { - for (Stream stream : streamList) { - @Nullable - StreamNameType streamName = stream.getName(); - if (streamName != null) { - String streamNameValue = stream.getName().getValue(); - log.info("Stream Name = {}, Stream Description = {}", streamNameValue, stream.getDescription()); - if (!(streamNameValue.equals(NetconfNotifications.DefaultNotificationsStream))) - // Register any not default stream. Default stream is already registered - registerNotificationsStream(streamNameValue); - } else { - log.warn("Ignore a stream without name"); - } - } - } - - @Override - public boolean isNotificationsSupported() { - return false; - } - - - /** - * check if nc-notifications.yang is supported by the device - */ - @Override - public boolean isNCNotificationsSupported() { - return getCapabilites().isSupportingNamespace(Netconf.QNAME); - } - - @Override - public List<Stream> getNotificationStreams() { - final Class<Netconf> netconfClazz = Netconf.class; - InstanceIdentifier<Netconf> streamsIID = InstanceIdentifier.builder(netconfClazz).build(); - - Netconf res = getTransactionUtils().readData(getDataBroker(), LogicalDatastoreType.OPERATIONAL, streamsIID); - if (res != null) { - Streams streams = res.getStreams(); - if (streams != null) { - return YangHelper.getList(streams.nonnullStream()); - } - } - return Collections.emptyList(); - } - - @Override - public Optional<NetconfNotifications> getNotificationAccessor() { - return Optional.of(this); - } - -} diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java index 4eaec246e..caf3da1a7 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NetconfDomAccessorImpl.java @@ -21,9 +21,16 @@ */ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom; +import static java.util.stream.Collectors.toList; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.ListenableFuture; +import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; @@ -34,6 +41,7 @@ import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; @@ -42,10 +50,23 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; import org.opendaylight.mdsal.dom.api.DOMMountPoint; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -55,16 +76,29 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class); - private final DOMDataBroker dataBroker; - private final DOMMountPoint mountpoint; - private final DomContext domContext; + private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription"); + private static final SchemaPath RPC_PATH_CREATE_SUBSCRIPTION = + NetconfMessageTransformUtil.toPath(CREATE_SUBSCRIPTION); + private static final YangInstanceIdentifier STREAMS_PATH = + YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build(); - public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker, DOMMountPoint mountPoint, - DomContext domContext) { + protected final DOMDataBroker dataBroker; + protected final DOMMountPoint mountpoint; + protected final DomContext domContext; + private final DOMNotificationService notificationService; + private final BindingNormalizedNodeSerializer serializer; + private final DOMRpcService rpcService; + + + public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker, + DOMMountPoint mountpoint, DomContext domContext) { super(accessor); this.dataBroker = Objects.requireNonNull(domDataBroker); - this.mountpoint = Objects.requireNonNull(mountPoint); + this.mountpoint = Objects.requireNonNull(mountpoint); this.domContext = Objects.requireNonNull(domContext); + this.serializer = domContext.getBindingNormalizedNodeSerializer(); + this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class); + this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class); } @Override @@ -78,28 +112,6 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco } @Override - public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener( - @NonNull T listener, Collection<SchemaPath> types) { - LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString()); - final Optional<DOMNotificationService> optionalNotificationService = - mountpoint.getService(DOMNotificationService.class); - if (optionalNotificationService.isPresent()) { - final ListenerRegistration<DOMNotificationListener> ranListenerRegistration = - optionalNotificationService.get().registerNotificationListener(listener, types); - LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}", - mountpoint.getIdentifier().toString(), optionalNotificationService, ranListenerRegistration); - return ranListenerRegistration; - } - throw new IllegalArgumentException("Can not get notification service"); - } - - @Override - public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener( - @NonNull T listener, SchemaPath[] types) { - return doRegisterNotificationListener(listener, Arrays.asList(types)); - } - - @Override public <T extends DataObject> Optional<T> readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path, Class<T> clazz) { LOG.debug("Read to object datastore:{} path:{}", dataStoreType, path); @@ -121,11 +133,11 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco try (DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction()) { FluentFuture<Optional<NormalizedNode<?, ?>>> foData = readOnlyTransaction.read(dataStoreType, path); // RAVI - Add a few debug here, like what ? Speak to Micha.... - + Optional<NormalizedNode<?, ?>> data = foData.get(120, TimeUnit.SECONDS); LOG.info("read is done - {} ", foData.isDone()); return data; - + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.info("Incomplete read to node transaction {} {}", dataStoreType, path, e); return Optional.empty(); @@ -138,7 +150,8 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco throws CanNotConvertException { if (oData.isPresent()) { NormalizedNode<?, ?> data = oData.get(); - LOG.debug("data identifier: {}", data.getIdentifier()); + LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(), + data.getNodeType()); @Nullable Entry<InstanceIdentifier<?>, DataObject> entry = serializer.fromNormalizedNode(path, data); if (entry != null) { @@ -158,4 +171,113 @@ public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements Netco throw new CanNotConvertException("No data received for path:" + path); } } + + @Override + public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener( + @NonNull T listener, Collection<SchemaPath> types) { + LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString()); + + final ListenerRegistration<DOMNotificationListener> ranListenerRegistration = + notificationService.registerNotificationListener(listener, types); + + LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}", + mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration); + + return ranListenerRegistration; + } + + @Override + public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener( + @NonNull T listener, SchemaPath[] types) { + return doRegisterNotificationListener(listener, Arrays.asList(types)); + } + + @Override + public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener( + @NonNull T listener, QName[] types) { + List<SchemaPath> schemaPathList = Arrays.stream(types).map(qname -> NetconfMessageTransformUtil.toPath(qname)).collect(toList()); + return doRegisterNotificationListener(listener, schemaPathList); + } + + + @Override + public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(CreateSubscriptionInput input) { + final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input); + return rpcService.invokeRpc(RPC_PATH_CREATE_SUBSCRIPTION, nnInput); + } + + @Override + public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Optional<Stream> oStream, + Optional<Filter> filter, Optional<Instant> startTime, Optional<Instant> stopTime) { + + CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder(); + boolean replayIsSupported = false; + if (oStream.isPresent()) { + Stream stream = oStream.get(); + if (stream.getName() != null) { + inputBuilder.setStream(stream.getName()); + } + replayIsSupported = Boolean.TRUE.equals(stream.isReplaySupport()); + + } + if (filter.isPresent()) { + inputBuilder.setFilter(filter.get()); + } + if (startTime.isPresent()) { + if (replayIsSupported) { + inputBuilder.setStartTime(getDateAndTime(startTime.get())); + if (stopTime.isPresent()) { + if (startTime.get().isBefore(stopTime.get())) { + inputBuilder.setStopTime(getDateAndTime(stopTime.get())); + } else { + throw new IllegalArgumentException("stopTime must be later than startTime"); + } + } + } else { + throw new IllegalArgumentException("Replay not supported by this stream."); + } + } + return invokeCreateSubscription(inputBuilder.build()); + } + + @Override + public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Stream... streams) { + ListenableFuture<? extends DOMRpcResult> res; + if (streams.length == 0) { + return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + } else if (streams.length == 1) { + return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(), + Optional.empty()); + } else { + for (Stream stream : streams) { + res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(), + Optional.empty()); + try { + if (!res.get().getErrors().isEmpty()) { + return res; + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Exception during rpc call", e); + return res; + } + } + } + throw new IllegalStateException("Could never be reached"); //avoid eclipse error + } + + @Override + public @NonNull Map<StreamKey, Stream> getNotificationStreamsAsMap() { + Optional<Streams> oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class); + return oStreams.isPresent() ? oStreams.get().nonnullStream() : Collections.emptyMap(); + } + + @Override + public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() { + return serializer; + } + + private DateAndTime getDateAndTime(Instant dateTime) { + final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime); + return new DateAndTime(formattedDate); + } } diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java new file mode 100644 index 000000000..c66a20c32 --- /dev/null +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/access/dom/NotificationServiceNotProvided.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom; + +public class NotificationServiceNotProvided extends Exception { + + private static final long serialVersionUID = 1L; + + public NotificationServiceNotProvided() { + super(); + } + + public NotificationServiceNotProvided(String message) { + super(message); + } + + public NotificationServiceNotProvided(String message, Throwable cause) { + super(message, cause); + } + + public NotificationServiceNotProvided(Throwable cause) { + super(cause); + } + +} diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java new file mode 100644 index 000000000..6f06641af --- /dev/null +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/mdsal/MdsalApi.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal; + +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMService; + +/** + * Collecting utilities for mdsal api + */ +public class MdsalApi { + /** + * Get mountpoint service and throw exception if not available + * @param <T> + * @param mountPoint getting the service from + * @param service class to request + * @return service requested or throw + * @throws IllegalStateException + */ + public static <T extends DOMService> T getMountpointService(final DOMMountPoint mountPoint, final Class<T> service) { + final Optional<T> optional = mountPoint.getService(service); + Preconditions.checkState(optional.isPresent(), "Service not present on mount point: %s", service.getName()); + return optional.get(); + } + +} diff --git a/sdnr/wt/netconfnode-state-service/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/test/TestNetconfAccessorImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/test/TestNetconfAccessorImpl.java index dd61db0d1..10c3b2697 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/test/TestNetconfAccessorImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/test/TestNetconfAccessorImpl.java @@ -23,28 +23,41 @@ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.test; import static org.junit.Assert.assertEquals; import java.util.Arrays; +import java.util.Collection; import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNotifications; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.NetconfNodeStateServiceImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingNotificationsImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.binding.NetconfBindingAccessorImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.NetconfDomAccessorImpl; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.test.example.TestNetconfHelper; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.MountPoint; import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; public class TestNetconfAccessorImpl extends Mockito { @@ -84,8 +97,8 @@ public class TestNetconfAccessorImpl extends Mockito { when(mountPoint.getService(RpcConsumerRegistry.class)).thenReturn(Optional.of(rpcComerRegistry)); //Start here - NetconfBindingNotificationsImpl test = - new NetconfBindingNotificationsImpl(netconfAccessor, dataBroker, mountPoint); + NetconfBindingAccessorImpl test = + new NetconfBindingAccessorImpl(netconfAccessor, dataBroker, mountPoint); String streamName = "NETCONF"; test.registerNotificationsStream(streamName); @@ -114,10 +127,10 @@ public class TestNetconfAccessorImpl extends Mockito { //Start here - NetconfBindingNotificationsImpl test = - new NetconfBindingNotificationsImpl(netconfAccessor, dataBroker, mountPoint); + NetconfBindingAccessorImpl test = + new NetconfBindingAccessorImpl(netconfAccessor, dataBroker, mountPoint); - String streamName = NetconfNotifications.DefaultNotificationsStream+"ChangeIt"; + String streamName = NetconfAccessor.DefaultNotificationsStream + "ChangeIt"; StreamNameType streamNameType = new StreamNameType(streamName); Stream stream = new StreamBuilder().setName(streamNameType).build(); test.registerNotificationsStream(Arrays.asList(stream)); @@ -130,4 +143,55 @@ public class TestNetconfAccessorImpl extends Mockito { } + @Test + public void testNetconfDomNotification() { + + DOMDataBroker domDataBroker = mock(DOMDataBroker.class); + DomContext domContext = mock(DomContext.class); + DOMRpcService domRpcService = mock(DOMRpcService.class); + NetconfAccessorImpl netconfAccessor = TestNetconfHelper.getNetconfAcessorImpl(); + DOMNotificationService domNotificationService = mock(DOMNotificationService.class); + DOMMountPoint domMountPoint = mock(DOMMountPoint.class); + + when(domNotificationService.registerNotificationListener(any(DOMNotificationListener.class), + ArgumentMatchers.<Collection<SchemaPath>>any())) + .thenReturn(new ListenerRegistration<DOMNotificationListener>() { + @Override + public @NonNull DOMNotificationListener getInstance() { + return null; + } + + @Override + public void close() {} + }); + + YangInstanceIdentifier mountpointPath = YangInstanceIdentifier.builder().node(NetworkTopology.QNAME).build(); + when(domMountPoint.getIdentifier()).thenReturn(mountpointPath); + when(domMountPoint.getService(DOMNotificationService.class)).thenReturn(Optional.of(domNotificationService)); + when(domMountPoint.getService(DOMRpcService.class)).thenReturn(Optional.of(domRpcService)); + + /* Remark: Throws WARN java.lang.UnsupportedOperationException + * "[main] WARN org.opendaylight.netconf.util.NetconfUtil - + * Unable to set namespace context, falling back to setPrefix() + * during initialization." + */ + NetconfDomAccessorImpl netconfDomAccessor = + new NetconfDomAccessorImpl(netconfAccessor, domDataBroker, domMountPoint, domContext); + + Collection<SchemaPath> types = Arrays.asList(SchemaPath.create(false, NetworkTopology.QNAME)); + DOMNotificationListener listener = (notification) -> System.out.println("Notification: " + notification); + ListenerRegistration<DOMNotificationListener> res = + netconfDomAccessor.doRegisterNotificationListener(listener, types); + + //Capture parameters and assert them + ArgumentCaptor<DOMNotificationListener> captor1 = ArgumentCaptor.forClass(DOMNotificationListener.class); + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SchemaPath>> captor2 = ArgumentCaptor.forClass(Collection.class); + verify(domNotificationService).registerNotificationListener(captor1.capture(), captor2.capture()); + + assertEquals("Listener", listener, captor1.getValue()); + assertEquals("SchemaPath", types, captor2.getValue()); + res.close(); + } + } |