diff options
author | 2019-02-28 17:43:35 +0100 | |
---|---|---|
committer | 2019-02-28 17:46:05 +0100 | |
commit | b4d7ca85b122f914fb43a4a3f4300e36dede9e2b (patch) | |
tree | 3f138a832f0bf10fa59eb636b98bd6912b8a3efd /sdnr/wt/websocketmanager2/provider/src/main/java | |
parent | 49b155ec687cdf58fb51fe8245a2f5f4582b68f0 (diff) |
Replace depreciated MDSAL interfaces
SDN-R websocketmanager2
Change-Id: I69c12ee993bf8e2740d8f72778d06c89bf758051
Issue-ID: SDNC-680
Signed-off-by: Herbert Eiselt <herbert.eiselt@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager2/provider/src/main/java')
5 files changed, 173 insertions, 106 deletions
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java index 32d9d7a4b..bd84043a2 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java @@ -17,19 +17,19 @@ ******************************************************************************/ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; public abstract class Blueprint implements AutoCloseable { - private RpcProviderRegistry rpcProviderRegistry = null; + private RpcProviderService rpcProviderRegistry = null; public abstract void init(); - public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) { + public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) { this.rpcProviderRegistry = rpcProviderRegistry; } - public RpcProviderRegistry getRpcProviderRegistry() { + public RpcProviderService getRpcProviderRegistry() { return this.rpcProviderRegistry; } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java index 18066128f..51554e3a6 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java @@ -18,9 +18,13 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; import com.google.common.util.concurrent.ListenableFuture; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.json.JSONObject; @@ -47,7 +51,7 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag private static final String APPLICATION_NAME = WebSocketManager.class.getName(); private static final int PORT = 8181; private final EventInputCallback rpcEventInputCallback; - + private final AkkaConfig akkaConfig; /** * timeout for websocket with no messages in ms */ @@ -56,16 +60,25 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag private final ArrayList<URI> clusterNodeClients = new ArrayList<>(); public WebSocketManager() { - super(); - rpcEventInputCallback = message -> { - LOG.debug("onMessagePushed: " + message); - SyncWebSocketClient client; - for (URI clientURI : WebSocketManager.this.clusterNodeClients) { - client = new SyncWebSocketClient(clientURI); - LOG.debug("try to push message to " + client.getURI()); - client.openAndSendAndCloseSync(message); + this(null, null); + } + + public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) { + super(); + this.akkaConfig = akkaconfig; + if (cb != null) { + this.rpcEventInputCallback = cb; + } else { + this.rpcEventInputCallback = message -> { + LOG.debug("onMessagePushed: " + message); + SyncWebSocketClient client; + for (URI clientURI : WebSocketManager.this.clusterNodeClients) { + client = new SyncWebSocketClient(clientURI); + LOG.debug("try to push message to " + client.getURI()); + client.openAndSendAndCloseSync(message); + } + }; } - }; LOG.info("Create servlet for {}", APPLICATION_NAME); } @@ -80,11 +93,13 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag // register Socket as the WebSocket to create on Upgrade factory.register(WebSocketManagerSocket.class); - AkkaConfig cfg = null; - try { - cfg = AkkaConfig.load(); - } catch (Exception e) { - LOG.warn("problem loading akka config: " + e.getMessage()); + AkkaConfig cfg = this.akkaConfig; + if (cfg == null) { + try { + cfg = AkkaConfig.load(); + } catch (Exception e) { + LOG.warn("problem loading akka config: " + e.getMessage()); + } } if (cfg != null && cfg.isCluster()) { this.initWSClients(cfg.getClusterConfig()); @@ -96,25 +111,30 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) { LOG.debug("Send message '{}'", input); RpcResultBuilder<WebsocketEventOutput> result; - try { - WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder(); - final String s = input.getXmlEvent(); - WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), s); - outputBuilder.setResponse("OK"); + + final String eventAsXmlString = input.getXmlEvent(); + if (eventAsXmlString != null) { + WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString); try { JSONObject o = new JSONObject(); o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName()); o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType()); o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent()); this.rpcEventInputCallback.onMessagePushed(o.toString()); + + WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder(); + outputBuilder.setResponse("OK"); + result = RpcResultBuilder.success(outputBuilder); } catch (Exception err) { LOG.warn("problem pushing messsage to other nodes: " + err.getMessage()); + result = RpcResultBuilder.failed(); + result.withError(ErrorType.APPLICATION, "Exception", err); } - result = RpcResultBuilder.success(outputBuilder); - } catch (Exception e) { - LOG.warn("Socketproblem: {}", e); + } else { + String msg = "Emtpy event received"; + LOG.warn(msg); result = RpcResultBuilder.failed(); - result.withError(ErrorType.APPLICATION, "Exception", e); + result.withError(ErrorType.APPLICATION, msg); } return result.buildFuture(); } @@ -123,6 +143,17 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag * Private functions */ + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + if (req.getHeader("Upgrade") != null) { + /* Accept upgrade request */ + resp.setStatus(101); + resp.setHeader("Upgrade", "XYZP"); + resp.setHeader("Connection", "Upgrade"); + resp.setHeader("OtherHeaderB", "Value"); + } + } + private void initWSClients(ClusterConfig clusterConfig) { for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) { if (clusterConfig.isMe(nodeConfig)) { diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java index 737fe5463..2c54d2a78 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java @@ -6,9 +6,9 @@ * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -18,10 +18,9 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2; import javax.servlet.ServletException; - -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService; +import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.osgi.service.http.HttpService; import org.osgi.service.http.NamespaceException; import org.slf4j.Logger; @@ -29,67 +28,73 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerProvider extends Blueprint { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class); - private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName(); - private static final String ALIAS = "/websocket"; + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class); + private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName(); + private static final String ALIAS = "/websocket"; + + private WebSocketManager wsServlet = null; + private ObjectRegistration<WebSocketManager> websocketService = null; + + public WebSocketManagerProvider() { + LOG.info("Creating provider for {}", APPLICATION_NAME); + } - private WebSocketManager wsServlet = null; - private RpcRegistration<WebsocketmanagerService> websocketService = null; - - public WebSocketManagerProvider() { - LOG.info("Creating provider for {}", APPLICATION_NAME); - } + @Override + public void init() { + LOG.info("Init provider for {}", APPLICATION_NAME); + RpcProviderService rpcProviderRegistry = this.getRpcProviderRegistry(); + if (rpcProviderRegistry != null) { + if (wsServlet != null) { + this.websocketService = + rpcProviderRegistry.registerRpcImplementation(WebsocketmanagerService.class, wsServlet); + LOG.info("websocketservice initialized"); + } else { + LOG.debug("wsServlet not yet provided"); + } + } else { + LOG.error("rpcProviderRegistry not provided"); + } + } - @Override - public void init() { - LOG.info("Init provider for {}", APPLICATION_NAME); - RpcProviderRegistry rpcProviderRegistry = this.getRpcProviderRegistry(); - if (rpcProviderRegistry != null) { - if (wsServlet != null) { - this.websocketService = rpcProviderRegistry.addRpcImplementation(WebsocketmanagerService.class, - wsServlet); - } else { - LOG.error("wsServlet not provided"); - } - } else { - LOG.error("rpcProviderRegistry not provided"); - } - } + @Override + public void close() throws Exception { + LOG.info("Close provider for {}", APPLICATION_NAME); + if (websocketService != null) { + websocketService.close(); + } + } - @Override - public void close() throws Exception { - LOG.info("Close provider for {}", APPLICATION_NAME); - if (websocketService != null) { - websocketService.close(); - } - } + public void onUnbindService(HttpService httpService) { + httpService.unregister(ALIAS); + wsServlet = null; + } - public void onUnbindService(HttpService httpService) { - httpService.unregister(ALIAS); - wsServlet = null; - } + public void onBindService(HttpService httpService) throws ServletException, NamespaceException { + if (httpService == null) { + LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService"); + } else { - public void onBindService(HttpService httpService) throws ServletException, NamespaceException { - if (httpService == null) { - LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService"); - } else { - - wsServlet = new WebSocketManager(); - httpService.registerServlet(ALIAS, wsServlet, null, null); - LOG.info("websocket servlet registered."); - if(this.websocketService==null) - this.init(); - else - LOG.info("websocketservice already initialized"); - } + if (wsServlet == null) { + wsServlet = new WebSocketManager(); + httpService.registerServlet(ALIAS, wsServlet, null, null); + LOG.info("websocket servlet registered."); + if (this.websocketService == null) { + this.init(); + } else { + LOG.info("websocketservice already initialized"); + } + } else { + LOG.warn("Servelt "); + } + } - } + } - public WebSocketManager getWsServlet() { - return wsServlet; - } + public WebSocketManager getWsServlet() { + return wsServlet; + } - public void setWsServlet(WebSocketManager wsServlet) { - this.wsServlet = wsServlet; - } + public void setWsServlet(WebSocketManager wsServlet) { + this.wsServlet = wsServlet; + } } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java index f445bcd23..0fd2cae4e 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory; public class WebSocketManagerSocket extends WebSocketAdapter { private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName()); - public static final String MSG_KEY_DATA = "data"; public static final String MSG_KEY_SCOPES = "scopes"; public static final String MSG_KEY_PARAM = "param"; @@ -87,16 +88,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketText(String message) { - LOG.info(this.getRemoteAdr() + " has sent " + message); + LOG.info("{} has sent {}",this.getRemoteAdr(), message); if (!this.manageClientRequest(message)) { this.manageClientRequest2(message); } - } @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - + LOG.debug("Binary not supported"); } @Override @@ -114,7 +114,6 @@ public class WebSocketManagerSocket extends WebSocketAdapter { @Override public void onWebSocketError(Throwable cause) { - LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage()); // super.onWebSocketError(cause); } @@ -166,14 +165,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter { try { JSONObject o = new JSONObject(request); if (o.has(KEY_NODENAME) && o.has(KEY_EVENTTYPE)) { - broadCast(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT)); + this.sendToAll(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT)); } } catch (Exception e) { LOG.warn("handle ws request failed:" + e.getMessage()); } } - private void send(String msg) { + public void send(String msg) { try { LOG.trace("sending {}", msg); this.session.getRemote().sendString(msg); @@ -181,18 +180,16 @@ public class WebSocketManagerSocket extends WebSocketAdapter { LOG.warn("problem sending message: " + e.getMessage()); } } - - private String getSessionId() { + public String getSessionId() { return this.myUniqueSessionId; } - public static void broadCast(String nodeName, String eventType, String xmlEvent) { - if (clientList != null && clientList.size() > 0) { + private void sendToAll(String nodeName, String eventType, String xmlEvent) { + if (clientList.size() > 0) { for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) { WebSocketManagerSocket socket = entry.getValue(); if (socket != null) { try { - UserScopes clientScopes = userScopesList.get(socket.getSessionId()); if (clientScopes != null) { if (clientScopes.hasScope(eventType)) { @@ -212,5 +209,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter { } } } + public static void broadCast(String nodeName, String eventType, String xmlEvent) { + if(clientList.size()>0) { + Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet(); + WebSocketManagerSocket s = e.iterator().next().getValue(); + if(s!=null) + { + s.sendToAll(nodeName, eventType, xmlEvent); + } + } + } } diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java index 0afe06e13..fb2384ee5 100644 --- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java +++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java @@ -19,6 +19,8 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; @@ -27,33 +29,53 @@ import org.slf4j.LoggerFactory; public class SyncWebSocketClient extends WebSocketClient { + public interface WebsocketEventHandler{ + void onMessageReceived(String message); + void onOpen(ServerHandshake arg0); + void onClose(int arg0, String arg1, boolean arg2); + void onError(Exception e); + } + private static final Logger LOG = LoggerFactory.getLogger(SyncWebSocketClient.class.getName()); private String messageToSend; - + private final List<WebsocketEventHandler> handlers; public SyncWebSocketClient(URI serverUri) { super(serverUri); + this.handlers = new ArrayList<WebsocketEventHandler>(); } public SyncWebSocketClient(String uri) throws URISyntaxException { this(new URI(uri)); } - + public void addEventHandler(WebsocketEventHandler h) { + this.handlers.add(h); + } + public void removeEventHandler(WebsocketEventHandler h) { + this.handlers.remove(h); + } + @Override public void onClose(int arg0, String arg1, boolean arg2) { LOG.debug("socket closed: {} {} {}", arg0, arg1, arg2); - + for(WebsocketEventHandler h:this.handlers) { + h.onClose(arg0,arg1,arg2); + } } @Override public void onError(Exception arg0) { LOG.warn("error on socket: {}", arg0.getMessage()); - + for(WebsocketEventHandler h:this.handlers) { + h.onError(arg0); + } } @Override public void onMessage(String arg0) { LOG.debug("received message: {}", arg0); - + for(WebsocketEventHandler h:this.handlers) { + h.onMessageReceived(arg0); + } } @Override @@ -64,7 +86,9 @@ public class SyncWebSocketClient extends WebSocketClient { this.send(this.messageToSend); this.messageToSend = null; } - + for(WebsocketEventHandler h:this.handlers) { + h.onOpen(arg0); + } } public void openAndSendAsync(String message) { |