diff options
author | 2017-07-25 15:18:33 +0800 | |
---|---|---|
committer | 2017-07-25 18:11:59 +0800 | |
commit | 672f3d40be83d9e380fd7be4b674d5e8d5fa36de (patch) | |
tree | 43105e1d5e2ba8e8accea8648e57e1cf87db3f00 /apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java | |
parent | 41d3db15a8e1a0496f9c2a5e15db2998a32bb9bf (diff) |
Divide the MSB source codes into two repos
Change-Id: Ie76d545b214a8ce5191f215350a623e1529983d9
Issue-id: MSB-5
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java new file mode 100644 index 0000000..865778e --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java @@ -0,0 +1,135 @@ +package org.onap.msb.apiroute; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.http.HttpEntity; +import org.onap.msb.apiroute.wrapper.consulextend.Consul; +import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckServiceDataEmptyAndAutoStopWatchFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckTagAndAutoStopWatchFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.ServiceModifyIndexFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchCatalogServicesTask; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchServiceHealthTask; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WriteBufferHandler; +import org.onap.msb.apiroute.wrapper.queue.ServiceConsumer; +import org.onap.msb.apiroute.wrapper.queue.ServiceData; +import org.onap.msb.apiroute.wrapper.queue.ServiceListConsumer; +import org.onap.msb.apiroute.wrapper.util.RouteUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncDataManager { + private static Consul consul; + private static WatchCatalogServicesTask serviceListWatchTask; + private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>(); + + private static final Logger LOGGER = LoggerFactory + .getLogger(SyncDataManager.class); + + private SyncDataManager() { + } + + public static void initSyncTask(final String ip, final int port) { + consul = Consul.builder().withHostAndPort(ip, port).build(); + startWatchServiceList(); + startQueueConsumer(); + } + + public static void startWatchServiceList() { + + LOGGER.info("===========start to WatchServiceList============"); + + // create service list watch task + serviceListWatchTask = new WatchCatalogServicesTask( + consul.catalogClient(), RouteUtil.WATCH_SECOND); + + // first,write data to serviceListQueue buffer. + // second,async thread will read data from serviceListQueue buffer. + serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>( + ServiceData.DataType.service_list)); + + // start watch + serviceListWatchTask.startWatch(); + } + + public static void startQueueConsumer() { + LOGGER.info("===========start to QueueConsumer Thread============"); + + // start ServiceListConsumer + new Thread(new ServiceListConsumer(), "ServiceListConsumerThread") + .start(); + + // start Service Consumer + int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM; + for (int i = 0; i < serviceQueneNum; i++) { + new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i) + .start(); + } + + } + + public static void startWatchService(final String serviceName) { + + LOGGER.info("===========start to Watch Service[" + serviceName + + "]============"); + // create service watch task + WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask( + consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND); + + // 1.service Data Empty filter + serviceWatchTask + .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter( + serviceName)); + + // 2.service change filter + serviceWatchTask.addFilter(new ServiceModifyIndexFilter()); + + // 3.apigateway tag filter:check tag and auto stop watch + serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter( + serviceName)); + + // start watch + serviceWatchTask.startWatch(); + + // save + serviceWatchTaskMap.put(serviceName, serviceWatchTask); + } + + public static void stopWatchServiceList() { + if (serviceListWatchTask != null) { + serviceListWatchTask.removeAllFilter(); + serviceListWatchTask.removeAllHandler(); + serviceListWatchTask.stopWatch(); + } + } + + public static void stopWatchService(String serviceName) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("stop " + serviceName + " service watch!"); + } + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + if (watchTask != null) { + watchTask.removeAllFilter(); + watchTask.removeAllHandler(); + watchTask.stopWatch(); + } + serviceWatchTaskMap.remove(serviceName); + } + + public static boolean resetIndex(String serviceName) { + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + + if (watchTask != null) { + return watchTask.resetIndex(); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find:" + serviceName); + } + + return false; + } + +} |