summaryrefslogtreecommitdiffstats
path: root/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
blob: 18cdccce3ae19779903fa82bb1904314f87152d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*-
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.java_websocket.WebSocket;
import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;

/**
 * The Class InternalMessageBusServer handles the server side of a web socket and handles the callback mechanism used to
 * receive messages on the web socket.
 *
 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
 * @param <M> the generic type
 */
abstract class InternalMessageBusServer<M> extends WebSocketServerImpl implements MessagingService<M> {
    // Logger for this class
    private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class);

    private static final int THREAD_FACTORY_STACK_SIZE = 256;

    // Name of the event bus.
    private static final String RAW_EVENT_BUS = "Raw-Event-Bus";

    // This instance handles the raw data received from the web socket
    private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();

    // The message block handler to which to pass messages coming in on this client
    private MessageBlockHandler<M> messageBlockHandler = null;

    // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
    // thread. These fields hold the thread and
    // the thread factory for creating threads.
    private ApplicationThreadFactory threadFactory =
            new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE);
    private Thread forwarderThread = null;

    /**
     * Construct the class and start the forwarding thread for received messages.
     *
     * @param address the address of the server machine
     */
    protected InternalMessageBusServer(final InetSocketAddress address) {
        // Call the super class to create the web socket
        super(address);
        LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort());

        // Create the data handler for forwarding messages
        messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
        messageBlockHandler.registerMessageHandler(rawMessageHandler);

        // Create the thread that manages the queue in the data handler
        forwarderThread = threadFactory.newThread(rawMessageHandler);
        forwarderThread.start();

        LOGGER.exit();
    }

    /**
     * Callback for binary messages received from the remote host.
     *
     * @param webSocket the web socket on which the raw message was received
     * @param rawMessage the received raw message
     * @see #onMessage(WebSocket, String)
     */
    @Override
    public void onMessage(final WebSocket webSocket, final ByteBuffer rawMessage) {
        messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket));
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void onMessage(final WebSocket webSocket, final String stringMessage) {
        messageBlockHandler.post(stringMessage);
    }

    /**
     * Register a subscriber class to the raw message handler.
     *
     * @param subscriber the subscriber
     */
    @Override
    public void addMessageListener(final MessageListener<M> subscriber) {
        rawMessageHandler.registerDataForwarder(subscriber);
    }

    /**
     * Removes the message listener.
     *
     * @param subscriber the subscriber
     */
    @Override
    public void removeMessageListener(final MessageListener<M> subscriber) {
        rawMessageHandler.unRegisterDataForwarder(subscriber);
    }

    /**
     * Stop the thread handling message forwarding.
     */
    protected void stopListener() {
        rawMessageHandler.shutdown();
    }
}