summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/websocketmanager2/provider/src/main/java
diff options
context:
space:
mode:
authorHerbert Eiselt <herbert.eiselt@highstreet-technologies.com>2019-02-28 17:43:35 +0100
committerHerbert Eiselt <herbert.eiselt@highstreet-technologies.com>2019-02-28 17:46:05 +0100
commitb4d7ca85b122f914fb43a4a3f4300e36dede9e2b (patch)
tree3f138a832f0bf10fa59eb636b98bd6912b8a3efd /sdnr/wt/websocketmanager2/provider/src/main/java
parent49b155ec687cdf58fb51fe8245a2f5f4582b68f0 (diff)
Replace depreciated MDSAL interfaces
SDN-R websocketmanager2 Change-Id: I69c12ee993bf8e2740d8f72778d06c89bf758051 Issue-ID: SDNC-680 Signed-off-by: Herbert Eiselt <herbert.eiselt@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/websocketmanager2/provider/src/main/java')
-rw-r--r--sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java8
-rw-r--r--sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java79
-rw-r--r--sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java125
-rw-r--r--sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java31
-rw-r--r--sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java36
5 files changed, 173 insertions, 106 deletions
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java
index 32d9d7a4b..bd84043a2 100644
--- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java
+++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/Blueprint.java
@@ -17,19 +17,19 @@
******************************************************************************/
package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
public abstract class Blueprint implements AutoCloseable {
- private RpcProviderRegistry rpcProviderRegistry = null;
+ private RpcProviderService rpcProviderRegistry = null;
public abstract void init();
- public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
+ public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
this.rpcProviderRegistry = rpcProviderRegistry;
}
- public RpcProviderRegistry getRpcProviderRegistry() {
+ public RpcProviderService getRpcProviderRegistry() {
return this.rpcProviderRegistry;
}
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java
index 18066128f..51554e3a6 100644
--- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java
+++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManager.java
@@ -18,9 +18,13 @@
package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.json.JSONObject;
@@ -47,7 +51,7 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag
private static final String APPLICATION_NAME = WebSocketManager.class.getName();
private static final int PORT = 8181;
private final EventInputCallback rpcEventInputCallback;
-
+ private final AkkaConfig akkaConfig;
/**
* timeout for websocket with no messages in ms
*/
@@ -56,16 +60,25 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag
private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
public WebSocketManager() {
- super();
- rpcEventInputCallback = message -> {
- LOG.debug("onMessagePushed: " + message);
- SyncWebSocketClient client;
- for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
- client = new SyncWebSocketClient(clientURI);
- LOG.debug("try to push message to " + client.getURI());
- client.openAndSendAndCloseSync(message);
+ this(null, null);
+ }
+
+ public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
+ super();
+ this.akkaConfig = akkaconfig;
+ if (cb != null) {
+ this.rpcEventInputCallback = cb;
+ } else {
+ this.rpcEventInputCallback = message -> {
+ LOG.debug("onMessagePushed: " + message);
+ SyncWebSocketClient client;
+ for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
+ client = new SyncWebSocketClient(clientURI);
+ LOG.debug("try to push message to " + client.getURI());
+ client.openAndSendAndCloseSync(message);
+ }
+ };
}
- };
LOG.info("Create servlet for {}", APPLICATION_NAME);
}
@@ -80,11 +93,13 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag
// register Socket as the WebSocket to create on Upgrade
factory.register(WebSocketManagerSocket.class);
- AkkaConfig cfg = null;
- try {
- cfg = AkkaConfig.load();
- } catch (Exception e) {
- LOG.warn("problem loading akka config: " + e.getMessage());
+ AkkaConfig cfg = this.akkaConfig;
+ if (cfg == null) {
+ try {
+ cfg = AkkaConfig.load();
+ } catch (Exception e) {
+ LOG.warn("problem loading akka config: " + e.getMessage());
+ }
}
if (cfg != null && cfg.isCluster()) {
this.initWSClients(cfg.getClusterConfig());
@@ -96,25 +111,30 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag
public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) {
LOG.debug("Send message '{}'", input);
RpcResultBuilder<WebsocketEventOutput> result;
- try {
- WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
- final String s = input.getXmlEvent();
- WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), s);
- outputBuilder.setResponse("OK");
+
+ final String eventAsXmlString = input.getXmlEvent();
+ if (eventAsXmlString != null) {
+ WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString);
try {
JSONObject o = new JSONObject();
o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName());
o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType());
o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent());
this.rpcEventInputCallback.onMessagePushed(o.toString());
+
+ WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
+ outputBuilder.setResponse("OK");
+ result = RpcResultBuilder.success(outputBuilder);
} catch (Exception err) {
LOG.warn("problem pushing messsage to other nodes: " + err.getMessage());
+ result = RpcResultBuilder.failed();
+ result.withError(ErrorType.APPLICATION, "Exception", err);
}
- result = RpcResultBuilder.success(outputBuilder);
- } catch (Exception e) {
- LOG.warn("Socketproblem: {}", e);
+ } else {
+ String msg = "Emtpy event received";
+ LOG.warn(msg);
result = RpcResultBuilder.failed();
- result.withError(ErrorType.APPLICATION, "Exception", e);
+ result.withError(ErrorType.APPLICATION, msg);
}
return result.buildFuture();
}
@@ -123,6 +143,17 @@ public class WebSocketManager extends WebSocketServlet implements Websocketmanag
* Private functions
*/
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ if (req.getHeader("Upgrade") != null) {
+ /* Accept upgrade request */
+ resp.setStatus(101);
+ resp.setHeader("Upgrade", "XYZP");
+ resp.setHeader("Connection", "Upgrade");
+ resp.setHeader("OtherHeaderB", "Value");
+ }
+ }
+
private void initWSClients(ClusterConfig clusterConfig) {
for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
if (clusterConfig.isMe(nodeConfig)) {
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java
index 737fe5463..2c54d2a78 100644
--- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java
+++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerProvider.java
@@ -6,9 +6,9 @@
* =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -18,10 +18,9 @@
package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
import javax.servlet.ServletException;
-
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.osgi.service.http.HttpService;
import org.osgi.service.http.NamespaceException;
import org.slf4j.Logger;
@@ -29,67 +28,73 @@ import org.slf4j.LoggerFactory;
public class WebSocketManagerProvider extends Blueprint {
- private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class);
- private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName();
- private static final String ALIAS = "/websocket";
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class);
+ private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName();
+ private static final String ALIAS = "/websocket";
+
+ private WebSocketManager wsServlet = null;
+ private ObjectRegistration<WebSocketManager> websocketService = null;
+
+ public WebSocketManagerProvider() {
+ LOG.info("Creating provider for {}", APPLICATION_NAME);
+ }
- private WebSocketManager wsServlet = null;
- private RpcRegistration<WebsocketmanagerService> websocketService = null;
-
- public WebSocketManagerProvider() {
- LOG.info("Creating provider for {}", APPLICATION_NAME);
- }
+ @Override
+ public void init() {
+ LOG.info("Init provider for {}", APPLICATION_NAME);
+ RpcProviderService rpcProviderRegistry = this.getRpcProviderRegistry();
+ if (rpcProviderRegistry != null) {
+ if (wsServlet != null) {
+ this.websocketService =
+ rpcProviderRegistry.registerRpcImplementation(WebsocketmanagerService.class, wsServlet);
+ LOG.info("websocketservice initialized");
+ } else {
+ LOG.debug("wsServlet not yet provided");
+ }
+ } else {
+ LOG.error("rpcProviderRegistry not provided");
+ }
+ }
- @Override
- public void init() {
- LOG.info("Init provider for {}", APPLICATION_NAME);
- RpcProviderRegistry rpcProviderRegistry = this.getRpcProviderRegistry();
- if (rpcProviderRegistry != null) {
- if (wsServlet != null) {
- this.websocketService = rpcProviderRegistry.addRpcImplementation(WebsocketmanagerService.class,
- wsServlet);
- } else {
- LOG.error("wsServlet not provided");
- }
- } else {
- LOG.error("rpcProviderRegistry not provided");
- }
- }
+ @Override
+ public void close() throws Exception {
+ LOG.info("Close provider for {}", APPLICATION_NAME);
+ if (websocketService != null) {
+ websocketService.close();
+ }
+ }
- @Override
- public void close() throws Exception {
- LOG.info("Close provider for {}", APPLICATION_NAME);
- if (websocketService != null) {
- websocketService.close();
- }
- }
+ public void onUnbindService(HttpService httpService) {
+ httpService.unregister(ALIAS);
+ wsServlet = null;
+ }
- public void onUnbindService(HttpService httpService) {
- httpService.unregister(ALIAS);
- wsServlet = null;
- }
+ public void onBindService(HttpService httpService) throws ServletException, NamespaceException {
+ if (httpService == null) {
+ LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService");
+ } else {
- public void onBindService(HttpService httpService) throws ServletException, NamespaceException {
- if (httpService == null) {
- LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService");
- } else {
-
- wsServlet = new WebSocketManager();
- httpService.registerServlet(ALIAS, wsServlet, null, null);
- LOG.info("websocket servlet registered.");
- if(this.websocketService==null)
- this.init();
- else
- LOG.info("websocketservice already initialized");
- }
+ if (wsServlet == null) {
+ wsServlet = new WebSocketManager();
+ httpService.registerServlet(ALIAS, wsServlet, null, null);
+ LOG.info("websocket servlet registered.");
+ if (this.websocketService == null) {
+ this.init();
+ } else {
+ LOG.info("websocketservice already initialized");
+ }
+ } else {
+ LOG.warn("Servelt ");
+ }
+ }
- }
+ }
- public WebSocketManager getWsServlet() {
- return wsServlet;
- }
+ public WebSocketManager getWsServlet() {
+ return wsServlet;
+ }
- public void setWsServlet(WebSocketManager wsServlet) {
- this.wsServlet = wsServlet;
- }
+ public void setWsServlet(WebSocketManager wsServlet) {
+ this.wsServlet = wsServlet;
+ }
}
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java
index f445bcd23..0fd2cae4e 100644
--- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java
+++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/WebSocketManagerSocket.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory;
public class WebSocketManagerSocket extends WebSocketAdapter {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName());
-
public static final String MSG_KEY_DATA = "data";
public static final String MSG_KEY_SCOPES = "scopes";
public static final String MSG_KEY_PARAM = "param";
@@ -87,16 +88,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketText(String message) {
- LOG.info(this.getRemoteAdr() + " has sent " + message);
+ LOG.info("{} has sent {}",this.getRemoteAdr(), message);
if (!this.manageClientRequest(message)) {
this.manageClientRequest2(message);
}
-
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len) {
-
+ LOG.debug("Binary not supported");
}
@Override
@@ -114,7 +114,6 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
@Override
public void onWebSocketError(Throwable cause) {
-
LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
// super.onWebSocketError(cause);
}
@@ -166,14 +165,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
try {
JSONObject o = new JSONObject(request);
if (o.has(KEY_NODENAME) && o.has(KEY_EVENTTYPE)) {
- broadCast(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT));
+ this.sendToAll(o.getString(KEY_NODENAME), o.getString(KEY_EVENTTYPE), o.getString(KEY_XMLEVENT));
}
} catch (Exception e) {
LOG.warn("handle ws request failed:" + e.getMessage());
}
}
- private void send(String msg) {
+ public void send(String msg) {
try {
LOG.trace("sending {}", msg);
this.session.getRemote().sendString(msg);
@@ -181,18 +180,16 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
LOG.warn("problem sending message: " + e.getMessage());
}
}
-
- private String getSessionId() {
+ public String getSessionId() {
return this.myUniqueSessionId;
}
- public static void broadCast(String nodeName, String eventType, String xmlEvent) {
- if (clientList != null && clientList.size() > 0) {
+ private void sendToAll(String nodeName, String eventType, String xmlEvent) {
+ if (clientList.size() > 0) {
for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
WebSocketManagerSocket socket = entry.getValue();
if (socket != null) {
try {
-
UserScopes clientScopes = userScopesList.get(socket.getSessionId());
if (clientScopes != null) {
if (clientScopes.hasScope(eventType)) {
@@ -212,5 +209,15 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
}
}
}
+ public static void broadCast(String nodeName, String eventType, String xmlEvent) {
+ if(clientList.size()>0) {
+ Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
+ WebSocketManagerSocket s = e.iterator().next().getValue();
+ if(s!=null)
+ {
+ s.sendToAll(nodeName, eventType, xmlEvent);
+ }
+ }
+ }
}
diff --git a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java
index 0afe06e13..fb2384ee5 100644
--- a/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java
+++ b/sdnr/wt/websocketmanager2/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/websocket/SyncWebSocketClient.java
@@ -19,6 +19,8 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@@ -27,33 +29,53 @@ import org.slf4j.LoggerFactory;
public class SyncWebSocketClient extends WebSocketClient {
+ public interface WebsocketEventHandler{
+ void onMessageReceived(String message);
+ void onOpen(ServerHandshake arg0);
+ void onClose(int arg0, String arg1, boolean arg2);
+ void onError(Exception e);
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SyncWebSocketClient.class.getName());
private String messageToSend;
-
+ private final List<WebsocketEventHandler> handlers;
public SyncWebSocketClient(URI serverUri) {
super(serverUri);
+ this.handlers = new ArrayList<WebsocketEventHandler>();
}
public SyncWebSocketClient(String uri) throws URISyntaxException {
this(new URI(uri));
}
-
+ public void addEventHandler(WebsocketEventHandler h) {
+ this.handlers.add(h);
+ }
+ public void removeEventHandler(WebsocketEventHandler h) {
+ this.handlers.remove(h);
+ }
+
@Override
public void onClose(int arg0, String arg1, boolean arg2) {
LOG.debug("socket closed: {} {} {}", arg0, arg1, arg2);
-
+ for(WebsocketEventHandler h:this.handlers) {
+ h.onClose(arg0,arg1,arg2);
+ }
}
@Override
public void onError(Exception arg0) {
LOG.warn("error on socket: {}", arg0.getMessage());
-
+ for(WebsocketEventHandler h:this.handlers) {
+ h.onError(arg0);
+ }
}
@Override
public void onMessage(String arg0) {
LOG.debug("received message: {}", arg0);
-
+ for(WebsocketEventHandler h:this.handlers) {
+ h.onMessageReceived(arg0);
+ }
}
@Override
@@ -64,7 +86,9 @@ public class SyncWebSocketClient extends WebSocketClient {
this.send(this.messageToSend);
this.messageToSend = null;
}
-
+ for(WebsocketEventHandler h:this.handlers) {
+ h.onOpen(arg0);
+ }
}
public void openAndSendAsync(String message) {