diff options
author | Michael DÜrre <michael.duerre@highstreet-technologies.com> | 2021-04-08 06:34:22 +0200 |
---|---|---|
committer | Michael DÜrre <michael.duerre@highstreet-technologies.com> | 2021-04-08 06:34:46 +0200 |
commit | f3969004c6ccac18e742c5fc48c844e315991023 (patch) | |
tree | f5486a62e842bb16ca7d3af47a8663df08feef55 /sdnr/wt/websocketmanager/provider/src/main/java | |
parent | a252be83694ae33260d99d5371ed48c1558aa2e8 (diff) |
update websocketmanager
update complete notification flow
Issue-ID: CCSDK-3252
Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com>
Change-Id: I87ba00f615707b942471fcace57bcda50ce37e61
Diffstat (limited to 'sdnr/wt/websocketmanager/provider/src/main/java')
8 files changed, 1039 insertions, 0 deletions
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java new file mode 100644 index 000000000..7b4916d5a --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManager.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerSocket.EventInputCallback; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterConfig; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterNodeInfo; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket.SyncWebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketManager extends WebSocketServlet { + + private static final long serialVersionUID = -681665669062744439L; + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName()); + private static final String APPLICATION_NAME = WebSocketManager.class.getName(); + private static final int PORT = 8181; + private final EventInputCallback rpcEventInputCallback; + private final AkkaConfig akkaConfig; + /** + * timeout for websocket with no messages in ms + */ + //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L; + private static final long IDLE_TIMEOUT = 0L; + + private final ArrayList<URI> clusterNodeClients = new ArrayList<>(); + + public WebSocketManager() { + this(null, null); + } + + public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) { + super(); + this.akkaConfig = akkaconfig; + if (cb != null) { + this.rpcEventInputCallback = cb; + } else { + this.rpcEventInputCallback = message -> { + LOG.debug("onMessagePushed: " + message); + SyncWebSocketClient client; + for (URI clientURI : WebSocketManager.this.clusterNodeClients) { + client = new SyncWebSocketClient(clientURI); + LOG.debug("try to push message to " + client.getURI()); + client.openAndSendAndCloseSync(message); + } + }; + } + LOG.info("Create servlet for {}", APPLICATION_NAME); + } + + @Override + public void configure(WebSocketServletFactory factory) { + LOG.info("Configure provider for {}", APPLICATION_NAME); + // set a second timeout + factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT); + factory.getPolicy().setMaxBinaryMessageSize(1); + factory.getPolicy().setMaxTextMessageSize(64 * 1024); + + // register Socket as the WebSocket to create on Upgrade + factory.register(WebSocketManagerSocket.class); + + AkkaConfig cfg = this.akkaConfig; + if (cfg == null) { + try { + cfg = AkkaConfig.load(); + } catch (Exception e) { + LOG.warn("problem loading akka config: " + e.getMessage()); + } + } + if (cfg != null && cfg.isCluster()) { + this.initWSClients(cfg.getClusterConfig()); + } + } + + /********************************************************** + * Private functions + */ + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + if (req.getHeader("Upgrade") != null) { + /* Accept upgrade request */ + resp.setStatus(101); + resp.setHeader("Upgrade", "XYZP"); + resp.setHeader("Connection", "Upgrade"); + resp.setHeader("OtherHeaderB", "Value"); + } + } + + private void initWSClients(ClusterConfig clusterConfig) { + for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) { + if (clusterConfig.isMe(nodeConfig)) { + continue; + } + String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT); + try { + LOG.debug("registering ws client for " + url); + clusterNodeClients.add(new URI(url)); + } catch (URISyntaxException e) { + LOG.warn("problem instantiating wsclient for url: " + url); + } + } + } +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java new file mode 100644 index 000000000..0b6e9b453 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager; + +import java.time.Instant; +import javax.servlet.ServletException; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.WebsocketManagerService; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput; +import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.opendaylight.yangtools.yang.common.QName; +import org.osgi.service.http.HttpService; +import org.osgi.service.http.NamespaceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketManagerProvider implements WebsocketManagerService, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerProvider.class); + private static final String APPLICATION_NAME = WebSocketManagerProvider.class.getName(); + private static final String ALIAS = "/websocket"; + + private WebSocketManager wsServlet = null; + + public WebSocketManagerProvider() { + LOG.info("Creating provider for {}", APPLICATION_NAME); + } + + + public void init() { + LOG.info("Init provider for {}", APPLICATION_NAME); + } + + @Override + public void close() throws Exception { + LOG.info("Close provider for {}", APPLICATION_NAME); + } + + public void onUnbindService(HttpService httpService) { + httpService.unregister(ALIAS); + wsServlet = null; + } + + public void onBindService(HttpService httpService) throws ServletException, NamespaceException { + if (httpService == null) { + LOG.warn("Unable to inject HttpService into DluxLoader. dlux modules won't work without httpService"); + } else { + + if (wsServlet == null) { + wsServlet = new WebSocketManager(); + httpService.registerServlet(ALIAS, wsServlet, null, null); + LOG.info("websocket servlet registered."); + } else { + LOG.warn("Servelt "); + } + } + + } + + public WebSocketManager getWsServlet() { + return wsServlet; + } + + public void setWsServlet(WebSocketManager wsServlet) { + this.wsServlet = wsServlet; + } + + + @Override + public void sendNotification(Notification notification, String nodeId, QName eventType) { + this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now())); + } + + @Override + public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) { + WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime)); + + } + + @Override + public void sendNotification(DOMNotification notification, String nodeId, QName eventType) { + LOG.warn("not yet implemented"); + + } + + @Override + public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) { + LOG.warn("not yet implemented"); + + } + +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java new file mode 100644 index 000000000..945de3c1f --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java @@ -0,0 +1,245 @@ +/* +* ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes; +import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketManagerSocket extends WebSocketAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName()); + public static final String MSG_KEY_DATA = "data"; + public static final DataType MSG_KEY_SCOPES = DataType.scopes; + public static final String MSG_KEY_PARAM = "param"; + public static final String MSG_KEY_VALUE = "value"; + public static final String MSG_KEY_SCOPE = "scope"; + + public static final String KEY_NODEID = "nodeId"; + public static final String KEY_EVENTTYPE = "eventType"; + private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\""; + private static final Pattern PATTERN_SCOPEREGISTRATION = + Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE); + private static final Random RND = new Random(); + + + /** + * list of all sessionids + */ + private static final List<String> sessionIds = new ArrayList<>(); + /** + * map of sessionid <=> UserScopes + */ + private static final HashMap<String, UserScopes> userScopesList = new HashMap<>(); + /** + * map of class.hashCode <=> class + */ + private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>(); + + private static final YangToolsMapper mapper = new YangToolsMapper(); + private final String myUniqueSessionId; + + private Session session = null; + + public interface EventInputCallback { + void onMessagePushed(final String message) throws Exception; + } + + public WebSocketManagerSocket() { + this.myUniqueSessionId = _genSessionId(); + } + + @Override + protected void finalize() throws Throwable { + sessionIds.remove(this.myUniqueSessionId); + } + + private static String _genSessionId() { + String sid = String.valueOf(RND.nextLong()); + while (sessionIds.contains(sid)) { + sid = String.valueOf(RND.nextLong()); + } + sessionIds.add(sid); + return sid; + } + + @Override + public void onWebSocketText(String message) { + LOG.info("{} has sent {}", this.getRemoteAdr(), message); + if (!this.manageClientRequest(message)) { + this.manageClientRequest2(message); + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + LOG.debug("Binary not supported"); + } + + @Override + public void onWebSocketConnect(Session sess) { + this.session = sess; + clientList.put(String.valueOf(this.hashCode()), this); + LOG.debug("client connected from " + this.getRemoteAdr()); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + clientList.remove(String.valueOf(this.hashCode())); + LOG.debug("client disconnected from " + this.getRemoteAdr()); + } + + @Override + public void onWebSocketError(Throwable cause) { + LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage()); + // super.onWebSocketError(cause); + } + + private String getRemoteAdr() { + String adr = "unknown"; + try { + adr = this.session.getRemoteAddress().toString(); + } catch (Exception e) { + LOG.debug("error resolving adr: {}", e.getMessage()); + } + return adr; + } + + /** + * + * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]} + * @return if handled + */ + private boolean manageClientRequest(String request) { + boolean ret = false; + final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request); + if(!matcher.find()) { + return false; + } + try { + ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class); + if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) { + ret = true; + String sessionId = this.getSessionId(); + UserScopes clientDto = new UserScopes(); + clientDto.setScopes(registration.getScopes()); + userScopesList.put(sessionId, clientDto); + this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes()))); + } + + } catch (JsonProcessingException e) { + LOG.warn("problem set scope: " + e.getMessage()); + try { + this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage()))); + } catch (JsonProcessingException e1) { + LOG.warn("problem sending error response via ws: " + e1); + } + } + return ret; + } + + /* + * broadcast message to all your clients + */ + private void manageClientRequest2(String request) { + try { + NotificationOutput notification = mapper.readValue(request, NotificationOutput.class); + if (notification.getNodeId() != null && notification.getType() != null) { + this.sendToAll(notification.getNodeId(), notification.getType(), request); + } + } catch (Exception e) { + LOG.warn("handle ws request failed:" + e.getMessage()); + } + } + + public void send(String msg) { + try { + LOG.trace("sending {}", msg); + this.session.getRemote().sendString(msg); + } catch (Exception e) { + LOG.warn("problem sending message: " + e.getMessage()); + } + } + + public String getSessionId() { + return this.myUniqueSessionId; + } + + private void sendToAll(NotificationOutput output) { + try { + this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output)); + } catch (JsonProcessingException e) { + LOG.warn("problem serializing noitifcation: ", e); + } + } + + private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) { + if (clientList.size() > 0) { + for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) { + WebSocketManagerSocket socket = entry.getValue(); + if (socket != null) { + try { + UserScopes clientScopes = userScopesList.get(socket.getSessionId()); + if (clientScopes != null) { + if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) { + socket.send(notification); + } else { + LOG.debug("client has not scope {}", reducedSchemaInfo); + } + } else { + LOG.debug("no scopes for notifications registered"); + } + } catch (Exception ioe) { + LOG.warn(ioe.getMessage()); + } + } else { + LOG.debug("cannot broadcast. socket is null"); + } + } + } + } + + public static void broadCast(NotificationOutput output) { + if (clientList.size() > 0) { + Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet(); + WebSocketManagerSocket s = e.iterator().next().getValue(); + if (s != null) { + s.sendToAll(output); + } + } + } + +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java new file mode 100644 index 000000000..6627eeadf --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/data/TimeRateLimitingQueue.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.data; + +import java.util.concurrent.ArrayBlockingQueue; + +public class TimeRateLimitingQueue<T> extends ArrayBlockingQueue<T>{ + + public TimeRateLimitingQueue(int capacity) { + super(capacity); + // TODO Auto-generated constructor stub + } + + +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java new file mode 100644 index 000000000..794515bb2 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/AkkaConfig.java @@ -0,0 +1,207 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AkkaConfig { + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName()); + + public static class ClusterNodeInfo { + @Override + public String toString() { + return "ClusterNodeInfo [protocol=" + protocol + ", clusterName=" + clusterName + ", remoteAdr=" + remoteAdr + + ", port=" + port + "]"; + } + + private final String protocol; + private final String clusterName; + private final String remoteAdr; + private final int port; + + public String getRemoteAddress() { + return this.remoteAdr; + } + + public ClusterNodeInfo(String s) throws Exception { + final String regex = "([a-z.]*):\\/\\/([a-zA-Z0-9-]*)@([a-zA-Z0-9.-]*):([0-9]*)"; + final Pattern pattern = Pattern.compile(regex); + final Matcher matcher = pattern.matcher(s); + if (!matcher.find()) { + throw new Exception("invalid seedNode format"); + } + this.protocol = matcher.group(1); + this.clusterName = matcher.group(2); + this.remoteAdr = matcher.group(3); + this.port = Integer.parseInt(matcher.group(4)); + } + } + public static class ClusterRoleInfo { + @Override + public String toString() { + return "ClusterRoleInfo [Role=" + Role + ", Index=" + Index + "]"; + } + + private final String Role; + private final int Index; + + public ClusterRoleInfo(String s) throws Exception { + final String regex = "([a-z]*)-([0-9]*)"; + final Pattern pattern = Pattern.compile(regex); + final Matcher matcher = pattern.matcher(s); + if (!matcher.find()) { + throw new Exception("invalid role format"); + } + this.Role = matcher.group(1); + this.Index = Integer.parseInt(matcher.group(2)); + } + + } + public static class ClusterConfig { + @Override + public String toString() { + return "ClusterConfig [seedNodes=" + seedNodes + ", roles=" + roles + "]"; + } + + private final List<ClusterNodeInfo> seedNodes; + private final List<ClusterRoleInfo> roles; + private final ClusterNodeInfo ismeInfo; + + public ClusterConfig(Config o) throws Exception { + { + this.seedNodes = new ArrayList<>(); + List<String> a = o.getStringList("seed-nodes"); + for (int i = 0; i < a.size(); i++) { + ClusterNodeInfo info = new ClusterNodeInfo(a.get(i)); + this.seedNodes.add(info); + } + this.roles = new ArrayList<>(); + a = o.getStringList("roles"); + for (int i = 0; i < a.size(); i++) { + ClusterRoleInfo s = new ClusterRoleInfo(a.get(i)); + this.roles.add(s); + } + int idx = this.roles.get(0).Index - 1; + if (idx >= 0 && idx < this.seedNodes.size()) { + this.ismeInfo = this.seedNodes.get(idx); + } else { + this.ismeInfo = null; + } + } + + } + + public boolean isCluster() { + return this.seedNodes != null ? this.seedNodes.size() > 1 : false; + } + + public boolean isMe(ClusterNodeInfo i) { + return this.ismeInfo != null ? this.ismeInfo.equals(i) : false; + } + + public List<ClusterNodeInfo> getSeedNodes() { + return this.seedNodes; + } + } + + private static final String DEFAULT_FILENAME = "configuration/initial/akka.conf"; + private final File file; + private final String resourceFilename; + private final String fileContent; + private ClusterConfig cluserConfig; + + public ClusterConfig getClusterConfig() { + return this.cluserConfig; + } + + private AkkaConfig(File file, boolean isResource) { + this.file = isResource ? null : file; + this.fileContent = null; + this.resourceFilename = isResource ? file.getName() : null; + } + + private AkkaConfig(String fileContent) { + this.file = null; + this.fileContent = fileContent; + this.resourceFilename = null; + } + + + @Override + public String toString() { + return "AkkaConfig [filename=" + file + ", cluserConfig=" + cluserConfig + "]"; + } + + private void loadFromFile() throws Exception { + Config cfg = null; + if (this.file != null) { + cfg = ConfigFactory.parseFile(this.file); + } else if (this.fileContent != null) { + cfg = ConfigFactory.parseString(this.fileContent); + } else if (this.resourceFilename != null) { + cfg = ConfigFactory.parseResources(this.getClass(), this.resourceFilename); + } + + if (cfg != null) { + this.cluserConfig = + new ClusterConfig(cfg.getConfig("odl-cluster-data").getConfig("akka").getConfig("cluster")); + } else { + LOG.warn("unable to parse config file"); + this.cluserConfig = null; + } + } + + public boolean isCluster() { + return this.cluserConfig != null ? this.cluserConfig.isCluster() : false; + } + + public static AkkaConfig load() throws Exception { + return load(DEFAULT_FILENAME); + } + + public static AkkaConfig load(String filename) throws Exception { + return load(filename, false); + } + + public static AkkaConfig load(String filename, boolean isResource) throws Exception { + AkkaConfig cfg = new AkkaConfig(new File(filename), isResource); + cfg.loadFromFile(); + + return cfg; + } + + public static AkkaConfig loadContent(String content) throws Exception { + AkkaConfig cfg = new AkkaConfig(content); + cfg.loadFromFile(); + + return cfg; + } + + + +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java new file mode 100644 index 000000000..5f3a5af2c --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java @@ -0,0 +1,138 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; + +import java.time.Duration; +import java.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Problems of to many notifications during mount of thousand of devices: + * <ul> + * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate + * <li>Notification processing blocks user -> App design with notifications popups + * </ul> + * Rate filter + * <ul> + * <li>Do not use a thread -> Do nothing if there are no notifications + * <li>Parameter1 integrationTime : Measurement or integration time for period + * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload + * <li>Start measurement on event received that comes later then + * </ul> + * + * <pre> + * Example (e: Event received, rateMaxCount=3) + * eee e e e e e e e e e e e e e e + * ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------| + * P1 P2 P1 P2 P3 P7 P1 + *Overload no no yes yes no no + * + * + *Intention to use: + * 1. Construct with parameters for WS stream to handle + * 2. + * </pre> + */ + +public class RateFilter { + + private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName()); + + private final Duration integrationTime; // Integration time to measure event rate + private final long rateMaxCount; //Rate for dropping packets + private Instant timeStampPeriodStart; //Time stamp period beginn + private Instant timeStampLastEvent; //Measurement interval + private long rateCount; // >0: integration running 0: no integration running + private boolean overload; //true means in overload status. Change at end of period only. + private GetNow get; + + /** + * Allow testing with own timestamp provider + */ + public interface GetNow { + Instant now(); + } + + public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) { + this.integrationTime = integrationTime; + this.rateMaxCount = rateMaxCount; + this.get = getNowMethod; + this.timeStampLastEvent = Instant.MIN; + } + + public RateFilter(Duration integrationTime, long rateMaxCount) { + this(integrationTime, rateMaxCount, () -> Instant.now()); + } + + public synchronized boolean getOverloadStatus() { + return overload; + } + + /** + * Handle filter on event received + */ + public synchronized void filterEvent() { + final Instant now = get.now(); + final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now); + this.timeStampLastEvent = now; + + if (durationSinceLastEvent.compareTo(integrationTime) >= 0) { + //No measurement. Sync and start with period + LOG.debug("Sync"); + timeStampPeriodStart = now; + rateCount = 1; //Reset event count .. is part of the + } else { + //Within period + Duration durationPeriod = Duration.between(timeStampPeriodStart, now); + rateCount++; + boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0; + LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime); + if (endOfPeriod) { + //Only if end of Period + overload = rateCount > rateMaxCount; + LOG.debug("Reset overload {}", overload); + timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime); + rateCount = 0; + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("RateFilter [integrationTime="); + builder.append(integrationTime); + builder.append(", rateMaxCount="); + builder.append(rateMaxCount); + builder.append(", timeStampPeriodStart="); + builder.append(timeStampPeriodStart); + builder.append(", timeStampLastEvent="); + builder.append(timeStampLastEvent); + builder.append(", rateCount="); + builder.append(rateCount); + builder.append(", overload="); + builder.append(overload); + builder.append("]"); + return builder.toString(); + } +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java new file mode 100644 index 000000000..3969bcb15 --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/UserScopes.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils; + +import java.util.List; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo; +import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.Scope; + +public class UserScopes { + + private List<Scope> scopes; + + /** + * + * @param list array of Strings + */ + public void setScopes(List<Scope> list) { + this.scopes = list; + } + + public boolean hasScope(NotificationOutput output) { + return this.hasScope(output.getNodeId(), output.getType()); + } + + public boolean hasScope(ReducedSchemaInfo schema) { + return this.hasScope(null, schema); + } + + public boolean hasScope(String nodeId, ReducedSchemaInfo reducedSchemaInfo) { + if (this.scopes == null) + return false; + for (Scope scope : this.scopes) { + if (scope.matches(nodeId, reducedSchemaInfo)) { + return true; + } + } + return false; + } + +} diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java new file mode 100644 index 000000000..c9177205b --- /dev/null +++ b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/websocket/SyncWebSocketClient.java @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncWebSocketClient extends WebSocketClient { + + public interface WebsocketEventHandler { + void onMessageReceived(String message); + + void onOpen(ServerHandshake arg0); + + void onClose(int arg0, String arg1, boolean arg2); + + void onError(Exception e); + } + + private static final Logger LOG = LoggerFactory.getLogger(SyncWebSocketClient.class.getName()); + private String messageToSend; + private final List<WebsocketEventHandler> handlers; + + public SyncWebSocketClient(URI serverUri) { + super(serverUri); + this.handlers = new ArrayList<WebsocketEventHandler>(); + } + + public SyncWebSocketClient(String uri) throws URISyntaxException { + this(new URI(uri)); + } + + public void addEventHandler(WebsocketEventHandler h) { + this.handlers.add(h); + } + + public void removeEventHandler(WebsocketEventHandler h) { + this.handlers.remove(h); + } + + @Override + public void onClose(int arg0, String arg1, boolean arg2) { + LOG.debug("socket closed: {} {} {}", arg0, arg1, arg2); + for (WebsocketEventHandler h : this.handlers) { + h.onClose(arg0, arg1, arg2); + } + } + + @Override + public void onError(Exception arg0) { + LOG.warn("error on socket: {}", arg0.getMessage()); + for (WebsocketEventHandler h : this.handlers) { + h.onError(arg0); + } + } + + @Override + public void onMessage(String arg0) { + LOG.debug("received message: {}", arg0); + for (WebsocketEventHandler h : this.handlers) { + h.onMessageReceived(arg0); + } + } + + @Override + public void onOpen(ServerHandshake arg0) { + LOG.debug("socket opened"); + if (this.messageToSend != null) { + LOG.debug("try to send: " + this.messageToSend); + this.send(this.messageToSend); + this.messageToSend = null; + } + for (WebsocketEventHandler h : this.handlers) { + h.onOpen(arg0); + } + } + + public void openAndSendAsync(String message) { + this.messageToSend = message; + this.connect(); + } + + public void openAndSendAndCloseSync(String message) { + try { + this.connectBlocking(); + } catch (InterruptedException e) { + LOG.warn("problem connecting:" + e.getMessage()); + Thread.currentThread().interrupt(); + } + this.send(message); + try { + this.closeBlocking(); + } catch (InterruptedException e) { + LOG.warn("problem disconnecting:" + e.getMessage()); + Thread.currentThread().interrupt(); + } + } + +} |