diff options
Diffstat (limited to 'sdnr/wt/websocketmanager2/provider/src/main')
6 files changed, 180 insertions, 129 deletions
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java index 32d9d7a4b..bd84043a2 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java @@ -17,19 +17,19 @@ ******************************************************************************/ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; public abstract class Blueprint implements AutoCloseable { - private RpcProviderRegistry rpcProviderRegistry = null; + private RpcProviderService rpcProviderRegistry = null; public abstract void init(); - public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) { + public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) { this.rpcProviderRegistry = rpcProviderRegistry; } - public RpcProviderRegistry getRpcProviderRegistry() { + public RpcProviderService getRpcProviderRegistry() { return this.rpcProviderRegistry; } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java index 18066128f..51554e3a6 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java @@ -18,9 +18,13 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; import com.google.common.util.concurrent.ListenableFuture; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.json.JSONObject; @@ -47,7 +51,7 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag private static final String APPLICATION_NAME = WebSocketManager.class.getName(); private static final int PORT = 8181; private final EventInputCallback rpcEventInputCallback; - + private final AkkaConfig akkaConfig; /** * timeout for websocket with no messages in ms */ @@ -56,16 +60,25 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag private final ArrayList<URI> clusterNodeClients = new ArrayList<>(); public WebSocketManager() { - super(); - rpcEventInputCallback = message -> { - LOG.debug("onMessagePushed: " + message); - SyncWebSocketClient client; - for (URI clientURI : WebSocketManager.this.clusterNodeClients) { - client = new SyncWebSocketClient(clientURI); - LOG.debug("try to push message to " + client.getURI()); - client.openAndSendAndCloseSync(message); + this(null, null); + } + + public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) { + super(); + this.akkaConfig = akkaconfig; + if (cb != null) { + this.rpcEventInputCallback = cb; + } else { + this.rpcEventInputCallback = message -> { + LOG.debug("onMessagePushed: " + message); + SyncWebSocketClient client; + for (URI clientURI : WebSocketManager.this.clusterNodeClients) { + client = new SyncWebSocketClient(clientURI); + LOG.debug("try to push message to " + client.getURI()); + client.openAndSendAndCloseSync(message); + } + }; } - }; LOG.info("Create servlet for {}", APPLICATION_NAME); } @@ -80,11 +93,13 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag // register Socket as the WebSocket to create on Upgrade factory.register(WebSocketManagerSocket.class); - AkkaConfig cfg = null; - try { - cfg = AkkaConfig.load(); - } catch (Exception e) { - LOG.warn("problem loading akka config: " + e.getMessage()); + AkkaConfig cfg = this.akkaConfig; + if (cfg == null) { + try { + cfg = AkkaConfig.load(); + } catch (Exception e) { + LOG.warn("problem loading akka config: " + e.getMessage()); + } } if (cfg != null && cfg.isCluster()) { this.initWSClients(cfg.getClusterConfig()); @@ -96,25 +111,30 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) { LOG.debug("Send message '{}'", input); RpcResultBuilder<WebsocketEventOutput> result; - try { - WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder(); - final String s = input.getXmlEvent(); - WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), s); - outputBuilder.setResponse("OK"); + + final String eventAsXmlString = input.getXmlEvent(); + if (eventAsXmlString != null) { + WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString); try { JSONObject o = new JSONObject(); o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName()); o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType()); o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent()); this.rpcEventInputCallback.onMessagePushed(o.toString()); + + WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder(); + outputBuilder.setResponse("OK"); + result = RpcResultBuilder.success(outputBuilder); } catch (Exception err) { LOG.warn("problem pushing messsage to other nodes: " + err.getMessage()); + result = RpcResultBuilder.failed(); + result.withError(ErrorType.APPLICATION, "Exception", err); } - result = RpcResultBuilder.success(outputBuilder); - } catch (Exception e) { - LOG.warn("Socketproblem: {}", e); + } else { + String msg = "Emtpy event received"; + LOG.warn(msg); result = RpcResultBuilder.failed(); - result.withError(ErrorType.APPLICATION, "Exception", e); + result.withError(ErrorType.APPLICATION, msg); } return result.buildFuture(); } @@ -123,6 +143,17 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag * Private functions */ + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + if (req.getHeader("Upgrade") != null) { + /* Accept upgrade request */ + resp.setStatus(101); + resp.setHeader("Upgrade", "XYZP"); + resp.setHeader("Connection", "Upgrade"); + resp.setHeader("OtherHeaderB", "Value"); + } + } + private void initWSClients(ClusterConfig clusterConfig) { for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) { if (clusterConfig.isMe(nodeConfig)) { diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java index 737fe5463..2c54d2a78 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java @@ -6,9 +6,9 @@ * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -18,10 +18,9 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; import javax.servlet.ServletException; - -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService; +import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.osgi.service.http.HttpService; import org.osgi.service.http.NamespaceException; import org.slf4j.Logger; @@ -29,67 +28,73 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerProvider extends Blueprint { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class); - private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName(); - private static final String ALIAS = "/websocket"; + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class); + private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName(); + private static final String ALIAS = "/websocket"; + + private WebSocketManager wsServlet = null; + private ObjectRegistration<WebSocketManager> websocketService = null; + + public WebSocketManagerProvider() { + LOG.info("Creating provider for {}", APPLICATION_NAME); + } - private WebSocketManager wsServlet = null; - private RpcRegistration<WebsocketmanagerService> websocketService = null; - - public WebSocketManagerProvider() { - LOG.info("Creating provider for {}", APPLICATION_NAME); - } + @Override + public void init() { + LOG.info("Init provider for {}", APPLICATION_NAME); + RpcProviderService rpcProviderRegistry = this.getRpcProviderRegistry(); + if (rpcProviderRegistry != null) { + if (wsServlet != null) { + this.websocketService = + rpcProviderRegistry.registerRpcImplementation(WebsocketmanagerService.class, wsServlet); + LOG.info("websocketservice initialized"); + } else { + LOG.debug("wsServlet not yet provided"); + } + } else { + LOG.error("rpcProviderRegistry not provided"); + } + } - @Override - public void init() { - LOG.info("Init provider for {}", APPLICATION_NAME); - RpcProviderRegistry rpcProviderRegistry = this.getRpcProviderRegistry(); - if (rpcProviderRegistry != null) { - if (wsServlet != null) { - this.websocketService = rpcProviderRegistry.addRpcImplementation(WebsocketmanagerService.class, - wsServlet); - } else { - LOG.error("wsServlet not provided"); - } - } else { - LOG.error("rpcProviderRegistry not provided"); - } - } + @Override + public void close() throws Exception { + LOG.info("Close provider for {}", APPLICATION_NAME); + if (websocketService != null) { + websocketService.close(); + } + } - @Override - public void close() throws Exception { - LOG.info("Close provider for {}", APPLICATION_NAME); - if (websocketService != null) { - websocketService.close(); - } - } + public void onUnbindService(HttpService httpService) { + httpService.unregister(ALIAS); + wsServlet = null; + } - public void onUnbindService(HttpService httpService) { - httpService.unregister(ALIAS); - wsServlet = null; - } + public void onBindService(HttpService httpService) throws ServletException, NamespaceException { + if (httpService == null) { + LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService"); + } else { - public void onBindService(HttpService httpService) throws ServletException, NamespaceException { - if (httpService == null) { - LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService"); - } else { - - wsServlet = new WebSocketManager(); - httpService.registerServlet(ALIAS, wsServlet, null, null); - LOG.info("websocket servlet registered."); - if(this.websocketService==null) - this.init(); - else - LOG.info("websocketservice already initialized"); - } + if (wsServlet == null) { + wsServlet = new WebSocketManager(); + httpService.registerServlet(ALIAS, wsServlet, null, null); + LOG.info("websocket servlet registered."); + if (this.websocketService == null) { + this.init(); + } else { + LOG.info("websocketservice already initialized"); + } + } else { + LOG.warn("Servelt "); + } + } - } + } - public WebSocketManager getWsServlet() { - return wsServlet; - } + public WebSocketManager getWsServlet() { + return wsServlet; + } - public void setWsServlet(WebSocketManager wsServlet) { - this.wsServlet = wsServlet; - } + public void setWsServlet(WebSocketManager wsServlet) { + this.wsServlet = wsServlet; + } } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java index f445bcd23..0fd2cae4e 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerSocket extends WebSocketAdapter { private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName()); - public static final String MSG_KEY_DATA = "data"; public static final String MSG_KEY_SCOPES = "scopes"; public static final String MSG_KEY_PARAM = "param"; @@ -87,16 +88,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketText(String message) { - LOG.info(this.getRemoteAdr() + " has sent " + message); + LOG.info("{} has sent {}",this.getRemoteAdr(), message); if (!this.manageClientRequest(message)) { this.manageClientRequest2(message); } - } @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - + LOG.debug("Binary not supported"); } @Override @@ -114,7 +114,6 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketError(Throwable cause) { - LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage()); // super.onWebSocketError(cause); } @@ -166,14 +165,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter { try { JSONObject o = new JSONObject(request); if (o.has(KEY_NODENAME) && o.has(KEY_EVENTTYPE)) { - broadCast(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT)); + this.sendToAll(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT)); } } catch (Exception e) { LOG.warn("handle ws request failed:" + e.getMessage()); } } - private void send(String msg) { + public void send(String msg) { try { LOG.trace("sending {}", msg); this.session.getRemote().sendString(msg); @@ -181,18 +180,16 @@ public class WebSocketManagerSocket extends WebSocketAdapter { LOG.warn("problem sending message: " + e.getMessage()); } } - - private String getSessionId() { + public String getSessionId() { return this.myUniqueSessionId; } - public static void broadCast(String nodeName, String eventType, String xmlEvent) { - if (clientList != null && clientList.size() > 0) { + private void sendToAll(String nodeName, String eventType, String xmlEvent) { + if (clientList.size() > 0) { for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) { WebSocketManagerSocket socket = entry.getValue(); if (socket != null) { try { - UserScopes clientScopes = userScopesList.get(socket.getSessionId()); if (clientScopes != null) { if (clientScopes.hasScope(eventType)) { @@ -212,5 +209,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter { } } } + public static void broadCast(String nodeName, String eventType, String xmlEvent) { + if(clientList.size()>0) { + Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet(); + WebSocketManagerSocket s = e.iterator().next().getValue(); + if(s!=null) + { + s.sendToAll(nodeName, eventType, xmlEvent); + } + } + } } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java index 0afe06e13..fb2384ee5 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java @@ -19,6 +19,8 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; @@ -27,33 +29,53 @@ import org.slf4j.LoggerFactory; public class SyncWebSocketClient extends WebSocketClient { + public interface WebsocketEventHandler{ + void onMessageReceived(String message); + void onOpen(ServerHandshake arg0); + void onClose(int arg0, String arg1, boolean arg2); + void onError(Exception e); + } + private static final Logger LOG = LoggerFactory.getLogger(SyncWebSocketClient.class.getName()); private String messageToSend; - + private final List<WebsocketEventHandler> handlers; public SyncWebSocketClient(URI serverUri) { super(serverUri); + this.handlers = new ArrayList<WebsocketEventHandler>(); } public SyncWebSocketClient(String uri) throws URISyntaxException { this(new URI(uri)); } - + public void addEventHandler(WebsocketEventHandler h) { + this.handlers.add(h); + } + public void removeEventHandler(WebsocketEventHandler h) { + this.handlers.remove(h); + } + @Override public void onClose(int arg0, String arg1, boolean arg2) { LOG.debug("socket closed: {} {} {}", arg0, arg1, arg2); - + for(WebsocketEventHandler h:this.handlers) { + h.onClose(arg0,arg1,arg2); + } } @Override public void onError(Exception arg0) { LOG.warn("error on socket: {}", arg0.getMessage()); - + for(WebsocketEventHandler h:this.handlers) { + h.onError(arg0); + } } @Override public void onMessage(String arg0) { LOG.debug("received message: {}", arg0); - + for(WebsocketEventHandler h:this.handlers) { + h.onMessageReceived(arg0); + } } @Override @@ -64,7 +86,9 @@ public class SyncWebSocketClient extends WebSocketClient { this.send(this.messageToSend); this.messageToSend = null; } - + for(WebsocketEventHandler h:this.handlers) { + h.onOpen(arg0); + } } public void openAndSendAsync(String message) { diff --git a/sdnr/wt/websocketmanager2/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml b/sdnr/wt/websocketmanager2/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml index 315921dbb..5e95b7b74 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml +++ b/sdnr/wt/websocketmanager2/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml @@ -7,9 +7,9 @@ ================================================================================================= Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under @@ -20,30 +20,14 @@ xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0" odl:use-default-for-reference-types="true"> -<!-- <bean id="wsServlet" --> -<!-- class="org.onap.ccsdk.sdnr.wt.websocketmanager2.WebSocketManager"> --> -<!-- </bean> --> - - <reference id="rpcProviderRegistry" - interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry" - odl:type="default" /> + <reference id="rpcProviderRegistry" interface="org.opendaylight.mdsal.binding.api.RpcProviderService" odl:type="default" /> - <bean id="provider" - class="org.onap.ccsdk.features.sdnr.wt.websocketmanager2.WebSocketManagerProvider" - init-method="init" destroy-method="close"> + <bean id="provider" class="org.onap.ccsdk.features.sdnr.wt.websocketmanager2.WebSocketManagerProvider" init-method="init" destroy-method="close"> <property name="rpcProviderRegistry" ref="rpcProviderRegistry" /> -<!-- <property name="wsServlet" ref="wsServlet" /> --> </bean> - <reference id="onBindService" availability="mandatory" activation="eager" interface="org.osgi.service.http.HttpService"> - <reference-listener ref="provider" bind-method="onBindService" unbind-method="onUnbindService"/> - </reference> -<!-- <service --> -<!-- interface="org.eclipse.jetty.websocket.servlet.WebSocketServlet" --> -<!-- ref="wsServlet"> --> -<!-- <service-properties> --> -<!-- <entry key="alias" value="/websocket" /> --> -<!-- </service-properties> --> -<!-- </service> --> + <reference id="onBindService" availability="mandatory" activation="eager" interface="org.osgi.service.http.HttpService"> + <reference-listener ref="provider" bind-method="onBindService" unbind-method="onUnbindService" /> + </reference> </blueprint> |