blob: 3134c67cf4b15c2d97e0f9262965961839e8db83 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;
import com.google.common.eventbus.EventBus;
import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
* This class is used to pass messages received on a Java web socket to listening application class instances using an
* event bus.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <M> the generic type
*/
public class MessageBlockHandler<M> {
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageBlockHandler.class);
/**
* This event bus will forward the events to all of its subscribers.
*/
private EventBus eventBus = null;
/**
* Instantiates a new data handler.
*
* @param eventBusName the name of the event bus for this message block handler
*/
public MessageBlockHandler(final String eventBusName) {
eventBus = new EventBus(eventBusName);
LOGGER.trace("message bus {} created ", eventBusName);
}
/**
* Post a raw message block on the data handler event bus of this class.
*
* @param rawMessageBlock the block containing raw messages
*/
public void post(final RawMessageBlock rawMessageBlock) {
if (rawMessageBlock.getMessage() != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("new raw message recieved from {}", rawMessageBlock.getConn() == null ? "server"
: rawMessageBlock.getConn().getRemoteSocketAddress().getHostName());
}
eventBus.post(rawMessageBlock);
}
}
/**
* Post a block of typed messages on the data handler event bus of this class.
*
* @param messageBlock the block containing typed messages
*/
public void post(final MessageBlock<M> messageBlock) {
if (messageBlock.getMessages() != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("new data message recieved from {}", messageBlock.getConnection() == null ? "server"
: messageBlock.getConnection().getRemoteSocketAddress().getHostName());
}
eventBus.post(messageBlock);
}
}
/**
* Post a string message on the data handler event bus of this class.
*
* @param messageString the string message
*/
public void post(final String messageString) {
if (messageString != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("new string message recieved from server: {}", messageString);
}
eventBus.post(messageString);
}
}
/**
* Register a listener to event bus.
*
* @param listener is an instance of WebSocketMessageListener
*/
public void registerMessageHandler(final MessageListener<M> listener) {
LOGGER.entry(listener);
if (listener == null) {
throw new IllegalArgumentException("listener object cannot be null");
}
eventBus.register(listener);
LOGGER.debug("message listener {} is registered with forwarder", listener);
LOGGER.exit();
}
/**
* Remove the listener subscribed to the event bus.
*
* @param listener the listener
*/
public void unRegisterMessageHandler(final MessageListener<M> listener) {
if (listener == null) {
throw new IllegalArgumentException("listener object cannot be null");
}
LOGGER.entry(listener);
eventBus.unregister(listener);
LOGGER.trace(" message listener {} unregistered from forwarder", listener);
LOGGER.exit();
}
}
|