diff options
author | Vidyashree Rama <vidyashree.rama@huawei.com> | 2018-08-09 12:21:34 +0530 |
---|---|---|
committer | Dan Timoney <dt5972@att.com> | 2018-08-14 14:48:20 +0000 |
commit | 8e8ec2eb81e062010da230fae30626cb07c25bd1 (patch) | |
tree | 67cba985891a5ecd9ad492a574defd9c6a5146b0 /restconf-client/provider/src/main/java | |
parent | c29034ab5ea15d4c336f068ca8007ccebfad73e3 (diff) |
RestconfDiscoveryNode Plugin implementation
Initial code submit for supporting RestconfDiscoveryNode Plugin implementation
Issue-ID: CCSDK-374
Change-Id: Ieb0b622b135ea78ef58bd36dfe171f4117bc3328
Signed-off-by: Vidyashree Rama <vidyashree.rama@huawei.com>
Diffstat (limited to 'restconf-client/provider/src/main/java')
6 files changed, 637 insertions, 5 deletions
diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventHandler.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventHandler.java new file mode 100644 index 000000000..155656e27 --- /dev/null +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventHandler.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import org.glassfish.jersey.media.sse.InboundEvent; +import org.glassfish.jersey.media.sse.EventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener that can be registered to listen for notifications. + */ +class EventHandler implements EventListener { + private static final Logger log = LoggerFactory.getLogger(EventListener.class); + private RestconfDiscoveryNode node; + + public EventHandler(RestconfDiscoveryNode node) { + this.node = node; + } + + @Override + public void onEvent(InboundEvent event) { + String payload = event.readData(); + if (!node.eventQueue().offer(payload)) { + log.error("Unable to process event " + + payload + "as processing queue is full"); + throw new RuntimeException("Unable to process event " + + payload + + "as processing queue is full"); + } + } +} diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventProcessor.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventProcessor.java new file mode 100644 index 000000000..a85876cae --- /dev/null +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/EventProcessor.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import org.onap.ccsdk.sli.core.sli.SvcLogicContext; +import org.onap.ccsdk.sli.core.sli.SvcLogicException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.onap.ccsdk.sli.plugins.prop.JsonParser.convertToProperties; + +/** + * Processes the events from event queue and executes callback DG. + */ +class EventProcessor implements Runnable { + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private RestconfDiscoveryNode node; + + private static final String EVENT_SUBSCRIPTION_ID = "ietf-notification:notification" + + ".ietf-yang-push:push-change-update" + + ".subscription-id"; + + public EventProcessor(RestconfDiscoveryNode node) { + this.node = node; + } + + @Override + public void run() { + while(true) { + try { + String payload = node.eventQueue().take(); + Map<String, String> param = convertToProperties(payload); + String id = param.get(EVENT_SUBSCRIPTION_ID); + SubscriptionInfo info = node.subscriptionInfoMap().get(id); + if (info != null) { + SvcLogicContext ctx = new SvcLogicContext(); + for (Map.Entry<String, String> entry : param.entrySet()) { + ctx.setAttribute(entry.getKey(), entry.getValue()); + } + SvcLogicGraphInfo callbackDG = info.callBackDG(); + callbackDG.executeGraph(ctx); + } + } catch (InterruptedException | SvcLogicException e) { + log.error(e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + } + } +} diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java index 9eaa67916..34bb2ee6e 100644 --- a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java @@ -1,25 +1,238 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + package org.onap.ccsdk.sli.plugins.restconfdiscovery; -import java.util.Map; +import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.SseFeature; import org.onap.ccsdk.sli.core.sli.SvcLogicContext; +import org.onap.ccsdk.sli.core.sli.SvcLogicException; +import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfapiCallNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; /** - * Created by root1 on 18/7/18. + * Representation of a plugin to subscribe for notification and then + * to handle the received notifications. */ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { + private static final Logger log = LoggerFactory.getLogger(RestconfDiscoveryNode.class); + + private ExecutorService executor = Executors.newCachedThreadPool(); + private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>(); + private RestconfapiCallNode restconfapiCallNode; + + private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>(); + private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>(); + + private static final String SUBSCRIBER_ID = "subscriberId"; + private static final String RESPONSE_CODE = "response-code"; + private static final String RESPONSE_PREFIX = "responsePrefix"; + private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notifications:output.identifier"; + private static final String RESPONSE_CODE_200 = "200"; + private static final String SSE_URL = "sseConnectURL"; + + /** + * Creates an instance of RestconfDiscoveryNode and + * starts processing of event. + */ + public RestconfDiscoveryNode() { + ExecutorService e = Executors.newFixedThreadPool(20); + EventProcessor p = new EventProcessor(this); + for (int i = 0; i < 20; ++i) { + e.execute(p); + } + } @Override - public void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) { + public void establishSubscription(Map<String, String> paramMap, + SvcLogicContext ctx) throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + if (subscriberId == null) { + throw new SvcLogicException("Subscriber Id is null"); + } + restconfapiCallNode.sendRequest(paramMap, ctx); + + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + // TODO: save subscription id and subscriber in MYSQL + + establishPersistentConnection(paramMap, ctx, subscriberId); + } else { + log.info("Failed to subscribe " + subscriberId); + throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE)); + } } @Override public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) { - + // TODO: to be implemented } @Override public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) { + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + PersistentConnection conn = runnableInfo.get(id); + conn.terminate(); + runnableInfo.remove(id); + subscriptionInfoMap.remove(id); + } + } + + class PersistentConnection implements Runnable { + private String url; + private volatile boolean running = true; + + PersistentConnection(String url) { + this.url = url; + } + + private void terminate() { + running = false; + } + + @Override + public void run() { + Client client = ClientBuilder.newBuilder() + .register(SseFeature.class).build(); + WebTarget target = client.target(url); + EventSource eventSource = EventSource.target(target).build(); + eventSource.register(new EventHandler(RestconfDiscoveryNode.this)); + eventSource.open(); + log.info("Connected to SSE source"); + while (running) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + log.error("Exception: " + e.getMessage()); + } + } + eventSource.close(); + log.info("Closed connection to SSE source"); + } + } + + /** + * Establishes a persistent between the client and server. + * + * @param paramMap input paramter map + * @param ctx service logic context + * @param subscriberId subscriber identifier + */ + void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx, + String subscriberId) { + String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx); + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriptionId(id); + info.subscriberId(subscriberId); + subscriptionInfoMap.put(id, info); + + String url = paramMap.get(SSE_URL); + PersistentConnection connection = new PersistentConnection(url); + runnableInfo.put(id, connection); + executor.execute(connection); + } + + /** + * Returns response code. + * + * @param prefix prefix given in input parameter + * @param ctx service logic context + * @return response code + */ + String getResponseCode(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE); + } + + /** + * Returns subscription id from event. + * + * @param prefix prefix given in input parameter + * @param ctx service logic context + * @return subscription id from event + */ + String getOutputIdentifier(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER); + } + + private String getPrefix(String prefix) { + return prefix != null ? prefix + "." : ""; + } + + private String getSubscriptionId(String subscriberId) { + for (Map.Entry<String,SubscriptionInfo> entry + : subscriptionInfoMap.entrySet()) { + if (entry.getValue().subscriberId() + .equals(subscriberId)) { + return entry.getKey(); + } + } + return null; + } + + /** + * Returns restconfApiCallNode. + * + * @return restconfApiCallNode + */ + protected RestconfapiCallNode restconfapiCallNode() { + return restconfapiCallNode; + } + + /** + * Sets restconfApiCallNode. + * + * @param node restconfApiCallNode + */ + void restconfapiCallNode(RestconfapiCallNode node) { + restconfapiCallNode = node; + } + + Map<String, SubscriptionInfo> subscriptionInfoMap() { + return subscriptionInfoMap; + } + + void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) { + this.subscriptionInfoMap = subscriptionInfoMap; + } + + LinkedBlockingQueue<String> eventQueue() { + return eventQueue; + } + void eventQueue(LinkedBlockingQueue<String> eventQueue) { + this.eventQueue = eventQueue; } } diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SubscriptionInfo.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SubscriptionInfo.java new file mode 100644 index 000000000..4ed3660ca --- /dev/null +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SubscriptionInfo.java @@ -0,0 +1,122 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +/** + * Holder to store information of subscription. + */ +public class SubscriptionInfo { + private String subscriptionId; + private String subscriberId; + private SvcLogicGraphInfo callbackDG; + private String yangFilePath; + private String filterUrl; + + /** + * Returns callback DG. + * + * @return callback DG + */ + public SvcLogicGraphInfo callBackDG() { + return callbackDG; + } + + /** + * Sets callback DG. + * + * @param callbackDg callback DG + */ + public void callBackDG(SvcLogicGraphInfo callbackDg) { + this.callbackDG = callbackDg; + } + + /** + * Returns YANG file path. + * + * @return YANG file path + */ + public String yangFilePath() { + return yangFilePath; + } + + /** + * Sets YANG file path. + * + * @param yangFilePath yang file path + */ + public void yangFilePath(String yangFilePath) { + this.yangFilePath = yangFilePath; + } + + /** + * Returns filter URL. + * + * @return filter URL + */ + public String filterUrl() { + return filterUrl; + } + + /** + * Sets filter URL. + * + * @param filterUrl filter URL + */ + public void filterUrl(String filterUrl) { + this.filterUrl = filterUrl; + } + + /** + * Returns subscription Id. + * + * @return subscription Id + */ + public String subscriptionId() { + return subscriptionId; + } + + /** + * Sets subscription id. + * + * @param subscriptionId subscription id + */ + public void subscriptionId(String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + /** + * Returns subscription Id. + * + * @return subscription Id + */ + public String subscriberId() { + return subscriberId; + } + + /** + * Sets subscriber id. + * + * @param subscriberId subscriber id + */ + public void subscriberId(String subscriberId) { + this.subscriberId = subscriberId; + } +} diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java index 183d22297..dfe8cd5b7 100644 --- a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java @@ -59,7 +59,7 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { * @since 11.0.2 * @see String#split(String, int) */ - void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx); + void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException; /** * Allows directed graphs to modify a discovery subscription for a given subscriber. diff --git a/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicGraphInfo.java b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicGraphInfo.java new file mode 100644 index 000000000..725826bda --- /dev/null +++ b/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicGraphInfo.java @@ -0,0 +1,178 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import org.onap.ccsdk.sli.core.sli.SvcLogicContext; +import org.onap.ccsdk.sli.core.sli.SvcLogicException; +import org.onap.ccsdk.sli.core.sli.SvcLogicGraph; +import org.onap.ccsdk.sli.core.sli.SvcLogicStore; +import org.onap.ccsdk.sli.core.sli.provider.SvcLogicService; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +/** + * Holder to store callback directed graph info. + */ +class SvcLogicGraphInfo { + private String module; + private String rpc; + private String mode; + private String version; + + /** + * Creates an instance of SvcLogicGraphInfo. + * + * @param module module name of callback DG + * @param rpc rpc name of callback DG + * @param mode mode of callback DG + * @param version version of callback DG + */ + public SvcLogicGraphInfo(String module, String rpc, String mode, String version) { + this.module = module; + this.rpc = rpc; + this.mode = mode; + this.version = version; + } + + public SvcLogicGraphInfo() {} + + /** + * Returns module name of callback DG. + * + * @return module name of callback DG + */ + public String module() { + return module; + } + + /** + * Sets module of callback DG. + * + * @param module module name of the DG + */ + public void module(String module) { + this.module = module; + } + + /** + * Returns rpc of callback DG. + * + * @return rpc of callback DG + */ + public String rpc() { + return rpc; + } + + /** + * Sets rpc of callback DG. + * + * @param rpc rpc attribute of the DG + */ + public void rpc(String rpc) { + this.rpc = rpc; + } + + /** + * Returns mode of callback DG. + * + * @return mode of callback DG + */ + public String mode() { + return mode; + } + + /** + * Sets mode of DG. + * + * @param mode mode of the DG + */ + public void mode(String mode) { + this.mode = mode; + } + + /** + * Returns version of callback DG. + * + * @return version of callback DG + */ + public String version() { + return version; + } + + /** + * Sets version of DG. + * + * @param version version of the DG + */ + public void version(String version) { + this.version = version; + } + + /** + * Executes call back DG. + * + * @param ctx service logic context + * @throws SvcLogicException service logic error + */ + public void executeGraph(SvcLogicContext ctx) throws SvcLogicException { + SvcLogicService service = findSvcLogicService(); + if (service == null) { + throw new SvcLogicException("\"Could not get SvcLogicService reference\""); + } + + SvcLogicStore store = service.getStore(); + if (store != null) { + SvcLogicGraph subGraph = store.fetch(module, rpc, version, mode); + if (subGraph != null) { + ctx.setAttribute("subGraph", subGraph.toString()); + service.execute(subGraph, ctx); + } else { + throw new SvcLogicException("Failed to call child [" + module + + "," + rpc + "," + version + + "," + mode + "] because" + + " the" + " graph could" + + " not be found"); + } + } else { + throw new SvcLogicException("\"Could not get SvcLogicStore reference\""); + } + } + + private static SvcLogicService findSvcLogicService() throws SvcLogicException { + Bundle bundle = FrameworkUtil.getBundle(SvcLogicService.class); + if (bundle == null) { + throw new SvcLogicException("Cannot find bundle reference for " + + SvcLogicService.NAME); + } + + BundleContext bctx = bundle.getBundleContext(); + ServiceReference<SvcLogicService> sref = bctx.getServiceReference( + SvcLogicService.class); + if (sref != null) { + return bctx.getService(sref); + } else { + throw new SvcLogicException("Cannot find service reference for " + + SvcLogicService.NAME); + } + } +} |