diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java new file mode 100644 index 0000000..865778e --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java @@ -0,0 +1,135 @@ +package org.onap.msb.apiroute; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.http.HttpEntity; +import org.onap.msb.apiroute.wrapper.consulextend.Consul; +import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckServiceDataEmptyAndAutoStopWatchFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.CheckTagAndAutoStopWatchFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.ServiceModifyIndexFilter; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchCatalogServicesTask; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WatchServiceHealthTask; +import org.onap.msb.apiroute.wrapper.consulextend.expose.WriteBufferHandler; +import org.onap.msb.apiroute.wrapper.queue.ServiceConsumer; +import org.onap.msb.apiroute.wrapper.queue.ServiceData; +import org.onap.msb.apiroute.wrapper.queue.ServiceListConsumer; +import org.onap.msb.apiroute.wrapper.util.RouteUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncDataManager { + private static Consul consul; + private static WatchCatalogServicesTask serviceListWatchTask; + private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>(); + + private static final Logger LOGGER = LoggerFactory + .getLogger(SyncDataManager.class); + + private SyncDataManager() { + } + + public static void initSyncTask(final String ip, final int port) { + consul = Consul.builder().withHostAndPort(ip, port).build(); + startWatchServiceList(); + startQueueConsumer(); + } + + public static void startWatchServiceList() { + + LOGGER.info("===========start to WatchServiceList============"); + + // create service list watch task + serviceListWatchTask = new WatchCatalogServicesTask( + consul.catalogClient(), RouteUtil.WATCH_SECOND); + + // first,write data to serviceListQueue buffer. + // second,async thread will read data from serviceListQueue buffer. + serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>( + ServiceData.DataType.service_list)); + + // start watch + serviceListWatchTask.startWatch(); + } + + public static void startQueueConsumer() { + LOGGER.info("===========start to QueueConsumer Thread============"); + + // start ServiceListConsumer + new Thread(new ServiceListConsumer(), "ServiceListConsumerThread") + .start(); + + // start Service Consumer + int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM; + for (int i = 0; i < serviceQueneNum; i++) { + new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i) + .start(); + } + + } + + public static void startWatchService(final String serviceName) { + + LOGGER.info("===========start to Watch Service[" + serviceName + + "]============"); + // create service watch task + WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask( + consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND); + + // 1.service Data Empty filter + serviceWatchTask + .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter( + serviceName)); + + // 2.service change filter + serviceWatchTask.addFilter(new ServiceModifyIndexFilter()); + + // 3.apigateway tag filter:check tag and auto stop watch + serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter( + serviceName)); + + // start watch + serviceWatchTask.startWatch(); + + // save + serviceWatchTaskMap.put(serviceName, serviceWatchTask); + } + + public static void stopWatchServiceList() { + if (serviceListWatchTask != null) { + serviceListWatchTask.removeAllFilter(); + serviceListWatchTask.removeAllHandler(); + serviceListWatchTask.stopWatch(); + } + } + + public static void stopWatchService(String serviceName) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("stop " + serviceName + " service watch!"); + } + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + if (watchTask != null) { + watchTask.removeAllFilter(); + watchTask.removeAllHandler(); + watchTask.stopWatch(); + } + serviceWatchTaskMap.remove(serviceName); + } + + public static boolean resetIndex(String serviceName) { + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + + if (watchTask != null) { + return watchTask.resetIndex(); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find:" + serviceName); + } + + return false; + } + +} |