aboutsummaryrefslogtreecommitdiffstats
path: root/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
blob: 0349cb548c190e68d7511b6cf2041bbfe48d3adf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*-
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
 *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;

import com.google.common.eventbus.EventBus;
import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;

/**
 * This class is used to pass messages received on a Java web socket to listening application class instances using an
 * event bus.
 *
 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
 * @param <M> the generic type
 */
public class MessageBlockHandler<M> {
    // Logger for this class
    private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageBlockHandler.class);

    /**
     * This event bus will forward the events to all of its subscribers.
     */
    private EventBus eventBus = null;

    /**
     * Instantiates a new data handler.
     *
     * @param eventBusName the name of the event bus for this message block handler
     */
    public MessageBlockHandler(final String eventBusName) {
        eventBus = new EventBus(eventBusName);
        LOGGER.trace("message bus {} created ", eventBusName);
    }

    /**
     * Post a raw message block on the data handler event bus of this class.
     *
     * @param rawMessageBlock the block containing raw messages
     */
    public void post(final RawMessageBlock rawMessageBlock) {
        if (rawMessageBlock.getMessage() != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("new raw message recieved from {}", rawMessageBlock.getWebSocket() == null ? "server"
                        : rawMessageBlock.getWebSocket().getRemoteSocketAddress().getHostName());
            }
            eventBus.post(rawMessageBlock);
        }
    }

    /**
     * Post a block of typed messages on the data handler event bus of this class.
     *
     * @param messageBlock the block containing typed messages
     */
    public void post(final MessageBlock<M> messageBlock) {
        if (messageBlock.getMessages() != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("new data message recieved from {}", messageBlock.getWebSocket() == null ? "server"
                        : messageBlock.getWebSocket().getRemoteSocketAddress().getHostName());
            }
            eventBus.post(messageBlock);
        }
    }

    /**
     * Post a string message on the data handler event bus of this class.
     *
     * @param messageString the string message
     */
    public void post(final String messageString) {
        if (messageString != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("new string message recieved from server: {}", messageString);
            }
            eventBus.post(messageString);
        }
    }

    /**
     * Register a listener to event bus.
     *
     * @param listener is an instance of WebSocketMessageListener
     */
    public void registerMessageHandler(final MessageListener<M> listener) {
        LOGGER.entry(listener);
        if (listener == null) {
            throw new IllegalArgumentException("listener object cannot be null");
        }
        eventBus.register(listener);
        LOGGER.debug("message listener {} is registered with forwarder", listener);
        LOGGER.exit();
    }

    /**
     * Remove the listener subscribed to the event bus.
     *
     * @param listener the listener
     */
    public void unRegisterMessageHandler(final MessageListener<M> listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener object cannot be null");
        }
        LOGGER.entry(listener);
        eventBus.unregister(listener);
        LOGGER.trace(" message listener {} unregistered from forwarder", listener);
        LOGGER.exit();
    }
}