summaryrefslogtreecommitdiffstats
path: root/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebManager.java
blob: b38bcb79fabe9bad0932fd46ecaf3bffc77f45a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/*-
 * ================================================================================
 * eCOMP Portal SDK
 * ================================================================================
 * Copyright (C) 2017 AT&T Intellectual Property
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ================================================================================
 */
package org.openecomp.portalsdk.core.onboarding.ueb;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiConstants;
import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiProperties;

/**
 * Manages UEB interactions and provides methods for publishing requests,
 * replies and others.
 */
public class UebManager {

	private final Log logger = LogFactory.getLog(getClass());

	private WaitingRequestersQueueList waitingRequestersQueueList;
	private PublisherList publisherList = new PublisherList();
	private static UebManager uebManager = null;

	private final String inTopicName;
	private final String consumerGroupName;
	private final String outTopicName;
	private final String appUebKey;
	private final String appUebSecret;

	private Publisher appPublisher;
	private Thread listenerThread;
	private boolean bThisIsEcompPortalServer = false;

	/**
	 * Constructor initializes fields and validates values obtained from
	 * properties.
	 * 
	 * The picture below is a simplified view of the relationships among ECOMP
	 * Portal and applications communicating via UEB:
	 * 
	 * <PRE>
	*                      ECOMP out to many.
	*                      App out to only ECOMP.
	*
	*  |----------------|<---------------------------------------------   
	*  |                |                                         | |  |
	*  |                |---------------------------> App 1 ------  |  |
	*  |  ECOMP Portal  |---------------------------> App 2 ---------  |
	*  |                |                            ...               |
	*  |                |---------------------------> App n -----------
	*  |----------------|
	 * </PRE>
	 * 
	 * @throws IOException
	 */
	protected UebManager() throws UebException {
		waitingRequestersQueueList = null;
		listenerThread = null;
		outTopicName = PortalApiProperties.getProperty(PortalApiConstants.ECOMP_PORTAL_INBOX_NAME);
		inTopicName = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME);
		appUebKey = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_KEY);
		appUebSecret = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_SECRET);
		String consGrp = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME);

		if (outTopicName == null || outTopicName.length() == 0)
			throw new UebException("Failed to get property " + PortalApiConstants.ECOMP_PORTAL_INBOX_NAME, null, null,
					null);
		if (inTopicName == null || inTopicName.length() == 0)
			throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME, null,
					null, null);
		if (consGrp == null || consGrp.length() == 0)
			throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME, null,
					null, null);
		if (appUebKey == null || appUebKey.length() == 0)
			throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_KEY, null, null, null);
		if (appUebSecret == null || appUebSecret.length() == 0)
			throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_SECRET, null, null, null);
		List<String> uebUrlList = Helper.uebUrlList();
		if (uebUrlList == null || uebUrlList.size() == 0)
			throw new UebException("Failed to get property" + PortalApiConstants.UEB_URL_LIST, null, null, null);
		// A bit of magic: if consumer group is a magic token, generate one.
		consumerGroupName = (PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME_GENERATOR.equals(consGrp)
				? UUID.randomUUID().toString() : consGrp);
	}

	/**
	 * Gets the static instance, creating it if necessary.
	 * 
	 * @return Instance of UebManager
	 * @throws IOException
	 */
	public static synchronized UebManager getInstance() throws UebException {
		if (uebManager == null) {
			uebManager = new UebManager();
		}
		return uebManager;
	}

	/**
	 * Answers whether the getInstance() method has previously been called.
	 * 
	 * @return True if a static instance is available, else false.
	 */
	public static boolean isInstanceAvailable() {
		return uebManager != null;
	}

	/**
	 * Creates a list of waiting requesters, creates and a consumer using cached
	 * information, and starts a new thread to run the consumer that listens for
	 * messages published to the inbound topic configured in the constructor.
	 * 
	 * @param inboxQueue
	 *            Queue supplied to the consumer. If not null, the consumer will
	 *            enqueue every message it receives.
	 */
	public void initListener(ConcurrentLinkedQueue<UebMsg> inboxQueue) throws UebException {
		waitingRequestersQueueList = new WaitingRequestersQueueList();
		Consumer runnable = new Consumer(appUebKey, appUebSecret, inTopicName, consumerGroupName, inboxQueue,
				waitingRequestersQueueList);
		this.listenerThread = new Thread(runnable, "UEBConsumerThread");
		this.listenerThread.start();
		Helper.sleep(400); // UEB functions more reliably when we give this some
							// time

		logger.info("UEBManager instance starting... " + inTopicName + " listener thread "
				+ this.listenerThread.getName() + " state = " + this.listenerThread.getState());

		/*
		 * ECOMP Portal manages a dynamic list of outbound topics and so the
		 * outTopicName is initialized in this logic with the same value as the
		 * inbound topic. The real outbound topics name will be added to the
		 * publisher list for ECOMP Portal. For an SDK/App instance only one
		 * publisher is needed, appPublisher.
		 */
		if (inTopicName.equalsIgnoreCase(outTopicName)) {
			this.bThisIsEcompPortalServer = true;
		} else {
			appPublisher = new Publisher(appUebKey, appUebSecret, outTopicName);
			Helper.sleep(400);
		}
	}

	/**
	 * Creates and adds a publisher to the list for the specified topic. This
	 * should only be called by the ECOMP Portal App, other Apps have just one
	 * publisher and use appPublisher
	 * 
	 * @param topicName
	 */
	public void addPublisher(String topicName) {
		logger.info("UEBManager adding publisher for " + topicName);
		Publisher outBoxToAppPublisher = new Publisher(appUebKey, appUebSecret, topicName);
		publisherList.addPublisherToMap(topicName, outBoxToAppPublisher);
	}

	/**
	 * Removes a publisher from the list for the specified topic.
	 *
	 * This should only be called by the ECOMP Portal App, other Apps have just
	 * one publisher and use appPublisher
	 * 
	 * @param topicName
	 */
	public void removePublisher(String topicName) {
		logger.info("UEBManager removing publisher for " + topicName);
		publisherList.removePublisherFromMap(topicName);
	}

	/**
	 * Adds the default ECOMP message ID to the message and sends the message to
	 * the topic.
	 * 
	 * @param msg
	 * @throws UebException
	 */
	public void publish(UebMsg msg) throws UebException {
		msg.putMsgId(PortalApiConstants.ECOMP_DEFAULT_MSG_ID);
		appPublisher.send(msg);
	}

	/**
	 * Sends the message using the default publisher.
	 * 
	 * @param msg
	 * @throws UebException
	 */
	public void publishReply(UebMsg msg) throws UebException {
		// Caller populates msgId with the echoed value from the request
		appPublisher.send(msg);
	}

	/**
	 * Sends the message using the appropriate publisher for the specified
	 * topic.
	 * 
	 * @param msg
	 * @param topicName
	 * @throws UebException
	 */
	public void publishEP(UebMsg msg, String topicName) throws UebException {
		Publisher publisher = publisherList.getPublisher(topicName);
		if (publisher != null) {
			msg.putMsgId(PortalApiConstants.ECOMP_DEFAULT_MSG_ID);
			publisher.send(msg);
		}
	}

	/**
	 * Publishes a reply using the appropriate publisher for the specified
	 * topic.
	 * 
	 * @param msg
	 * @param topicName
	 * @throws UebException
	 */
	public void publishReplyEP(UebMsg msg, String topicName) throws UebException {
		// Caller populates msgId with the echoed value from the request
		Publisher publisher = publisherList.getPublisher(topicName);
		if (publisher != null) {
			publisher.send(msg);
		}
	}

	/**
	 * Sends the specified message using the specified publisher, and waits for
	 * a reply. Retransmits if no reply is received in 5 seconds; gives up after
	 * 3 retries.
	 * 
	 * @param msg
	 * @param publisher
	 * @return Message from a remote publisher, or null if timeout happens.
	 * @throws UebException
	 */
	public UebMsg requestReplyUsingPublisher(UebMsg msg, Publisher publisher) throws UebException {
		UebMsg reply = null;
		if (waitingRequestersQueueList == null) {
			logger.error("requestReplyUsingPublisher called but listener thread not initialized");
		} else {
			// Storing a non-default message ID identifies this as a synchronous
			// request
			msg.putMsgId(UUID.randomUUID().toString());

			/*
			 * Create a queue for this request, the consumer thread will insert
			 * the reply on this queue
			 */
			LinkedBlockingQueue<UebMsg> replyQueue = new LinkedBlockingQueue<UebMsg>();
			waitingRequestersQueueList.addQueueToMap(msg.getMsgId(), replyQueue);

			/*
			 * Send the request
			 */
			publisher.send(msg);

			/*
			 * Wait for reply up to 3 * 5 = 15 seconds
			 */
			int reTransmits = 0;
			int maxRetransmits = 3;
			int retransmitTimeMs = 5000;
			long sendTimeStamp = System.currentTimeMillis();
			while (reTransmits < maxRetransmits) {
				if ((reply = replyQueue.poll()) != null)
					break;

				long now = System.currentTimeMillis();
				if (now - sendTimeStamp > retransmitTimeMs) {
					logger.debug("Retransmitting send... msg = " + msg.getPayload() + msg.getMsgId());
					publisher.send(msg);
					sendTimeStamp = System.currentTimeMillis();
					reTransmits++;
				}
			}
			waitingRequestersQueueList.removeQueueFromMap(msg.getMsgId());
			if (reTransmits == maxRetransmits)
				throw new UebException(PortalApiConstants.ECOMP_UEB_TIMEOUT_ERROR, inTopicName, null, msg.toString());

		}
		return reply;
	}

	/**
	 * Sends the specified message using the default publisher and waits for a
	 * reply.
	 * 
	 * @param msg
	 * @return Message from a remote publisher, or null if timeout happens.
	 * @throws UebException
	 */
	public UebMsg requestReply(UebMsg msg) throws UebException {
		return requestReplyUsingPublisher(msg, appPublisher);
	}

	/**
	 * Sends the specified message using the publisher appropriate for the
	 * specified topic name, and waits for a reply.
	 * 
	 * @param msg
	 * @param topicName
	 * @return Message from a remote publisher, or null if timeout happens.
	 * @throws UebException
	 */
	public UebMsg requestReplyEP(UebMsg msg, String topicName) throws UebException {
		UebMsg returnMsg = null;
		Publisher publisher = publisherList.getPublisher(topicName);
		if (publisher != null) {
			returnMsg = requestReplyUsingPublisher(msg, publisher);
		}
		return returnMsg;
	}

	/**
	 * Publishes the payload as a UEB widget-notification message on the default
	 * publisher. Intended for use by Apps inter widget communication, not EP
	 * itself.
	 * 
	 * @param payload
	 * @param userId
	 */
	public void postWidgetNotification(String payload, String userId) throws UebException {
		UebMsg msg = new UebMsg();
		msg.putPayload(payload);
		msg.putUserId(userId);
		msg.putMsgType(UebMsgTypes.UEB_MSG_TYPE_WIDGET_NOTIFICATION);
		this.publish(msg);
	}

	/**
	 * Interrupts the long-running thread that runs the consumer.
	 */
	public void shutdown() {
		if (this.listenerThread != null) {
			this.listenerThread.interrupt();
		}
	}
}