summaryrefslogtreecommitdiffstats
path: root/netconf/restconf/restconf-nb-bierman02/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java
blob: a67a4919163272656e835a75ef90807f0b632c78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
 * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
 * and is available at http://www.eclipse.org/legal/epl-v10.html
 */
package org.opendaylight.controller.sal.restconf.impl.websockets.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketClient {

    private static final Logger LOG = LoggerFactory.getLogger(WebSocketClient.class);

    private final URI uri;
    private final Bootstrap bootstrap = new Bootstrap();
    private final WebSocketClientHandler clientHandler;
    private Channel clientChannel;
    private final EventLoopGroup group = new NioEventLoopGroup();

    public WebSocketClient(final URI uri, final IClientMessageCallback clientMessageCallback) {
        this.uri = uri;
        clientHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri,
                WebSocketVersion.V13, null, false, null), clientMessageCallback);
        // last null could be replaced with DefaultHttpHeaders
        initialize();
    }

    private void initialize() {

        String protocol = uri.getScheme();
        if (!"ws".equals(protocol)) {
            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
        }

        bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(final SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("http-codec", new HttpClientCodec());
                pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
                pipeline.addLast("ws-handler", clientHandler);
            }
        });
    }

    public void connect() throws InterruptedException {
        LOG.info("WebSocket Client connecting");
        clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
        clientHandler.handshakeFuture().sync();
    }

    public void writeAndFlush(final String message) {
        clientChannel.writeAndFlush(new TextWebSocketFrame(message));
    }

    public void writeAndFlush(final Object message) {
        clientChannel.writeAndFlush(message);
    }

    public void ping() {
        clientChannel.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 })));
    }

    public void close(final String reasonText) throws InterruptedException {
        CloseWebSocketFrame closeWebSocketFrame = new CloseWebSocketFrame(1000, reasonText);
        clientChannel.writeAndFlush(closeWebSocketFrame);

        // WebSocketClientHandler will close the connection when the server
        // responds to the CloseWebSocketFrame.
        clientChannel.closeFuture().sync();
        group.shutdownGracefully();
    }

    public static void main(final String[] args) throws Exception {
        URI uri;
        if (args.length > 0) {
            uri = new URI(args[0]);
        } else {
            uri = new URI("http://192.168.1.101:8181/opendaylight-inventory:nodes");
        }
        IClientMessageCallback messageCallback = new ClientMessageCallback();
        WebSocketClient webSocketClient = new WebSocketClient(uri, messageCallback);
        webSocketClient.connect();

        while (true) {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String input = br.readLine();
            if (input.equals("q")) {
                LOG.info("Would you like to close stream? (Y = yes, empty = yes)\n");
                input = br.readLine();
                if (input.equals("yes") || input.isEmpty()) {
                    webSocketClient.close("opendaylight-inventory:nodes");
                    break;
                }
            }
        }
    }

    private static class ClientMessageCallback implements IClientMessageCallback {
        @Override
        public void onMessageReceived(final Object message) {
            if (message instanceof TextWebSocketFrame) {
                LOG.info("received message {}" + ((TextWebSocketFrame) message).text());
            }
        }
    }

}