diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java | 221 |
1 files changed, 105 insertions, 116 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java index aa211aa..0acb9ee 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/SyncDataManager.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute; @@ -34,117 +32,108 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SyncDataManager { - private static Consul consul; - private static WatchCatalogServicesTask serviceListWatchTask; - private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = new ConcurrentHashMap<String, WatchServiceHealthTask>(); - - private static final Logger LOGGER = LoggerFactory - .getLogger(SyncDataManager.class); + private static Consul consul; + private static WatchCatalogServicesTask serviceListWatchTask; + private final static Map<String, WatchServiceHealthTask> serviceWatchTaskMap = + new ConcurrentHashMap<String, WatchServiceHealthTask>(); - private SyncDataManager() { - } + private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataManager.class); - public static void initSyncTask(final String ip, final int port) { - consul = Consul.builder().withHostAndPort(ip, port).build(); - startWatchServiceList(); - startQueueConsumer(); - } - - public static void startWatchServiceList() { - - LOGGER.info("===========start to WatchServiceList============"); - - // create service list watch task - serviceListWatchTask = new WatchCatalogServicesTask( - consul.catalogClient(), RouteUtil.WATCH_SECOND); - - // first,write data to serviceListQueue buffer. - // second,async thread will read data from serviceListQueue buffer. - serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>( - ServiceData.DataType.service_list)); - - // start watch - serviceListWatchTask.startWatch(); - } - - public static void startQueueConsumer() { - LOGGER.info("===========start to QueueConsumer Thread============"); - - // start ServiceListConsumer - new Thread(new ServiceListConsumer(), "ServiceListConsumerThread") - .start(); - - // start Service Consumer - int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM; - for (int i = 0; i < serviceQueneNum; i++) { - new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i) - .start(); - } - - } - - public static void startWatchService(final String serviceName) { - - LOGGER.info("===========start to Watch Service[" + serviceName - + "]============"); - // create service watch task - WatchServiceHealthTask serviceWatchTask = new WatchServiceHealthTask( - consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND); - - // 1.service Data Empty filter - serviceWatchTask - .addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter( - serviceName)); - - // 2.service change filter - serviceWatchTask.addFilter(new ServiceModifyIndexFilter()); - - // 3.apigateway tag filter:check tag and auto stop watch - serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter( - serviceName)); - - // start watch - serviceWatchTask.startWatch(); - - // save - serviceWatchTaskMap.put(serviceName, serviceWatchTask); - } - - public static void stopWatchServiceList() { - if (serviceListWatchTask != null) { - serviceListWatchTask.removeAllFilter(); - serviceListWatchTask.removeAllHandler(); - serviceListWatchTask.stopWatch(); - } - } - - public static void stopWatchService(String serviceName) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("stop " + serviceName + " service watch!"); - } - - WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); - if (watchTask != null) { - watchTask.removeAllFilter(); - watchTask.removeAllHandler(); - watchTask.stopWatch(); - } - serviceWatchTaskMap.remove(serviceName); - } - - public static boolean resetIndex(String serviceName) { - - WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + private SyncDataManager() {} - if (watchTask != null) { - return watchTask.resetIndex(); - } + public static void initSyncTask(final String ip, final int port) { + consul = Consul.builder().withHostAndPort(ip, port).build(); + startWatchServiceList(); + startQueueConsumer(); + } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset modify index.did not find:" + serviceName); - } + public static void startWatchServiceList() { - return false; - } + LOGGER.info("===========start to WatchServiceList============"); + + // create service list watch task + serviceListWatchTask = new WatchCatalogServicesTask(consul.catalogClient(), RouteUtil.WATCH_SECOND); + + // first,write data to serviceListQueue buffer. + // second,async thread will read data from serviceListQueue buffer. + serviceListWatchTask.addHandler(new WriteBufferHandler<HttpEntity>(ServiceData.DataType.service_list)); + + // start watch + serviceListWatchTask.startWatch(); + } + + public static void startQueueConsumer() { + LOGGER.info("===========start to QueueConsumer Thread============"); + + // start ServiceListConsumer + new Thread(new ServiceListConsumer(), "ServiceListConsumerThread").start(); + + // start Service Consumer + int serviceQueneNum = RouteUtil.SERVICE_DATA_QUEUE_NUM; + for (int i = 0; i < serviceQueneNum; i++) { + new Thread(new ServiceConsumer(i), "ServiceConsumerThread" + i).start(); + } + + } + + public static void startWatchService(final String serviceName) { + + LOGGER.info("===========start to Watch Service[" + serviceName + "]============"); + // create service watch task + WatchServiceHealthTask serviceWatchTask = + new WatchServiceHealthTask(consul.healthClient(), serviceName, RouteUtil.WATCH_SECOND); + + // 1.service Data Empty filter + serviceWatchTask.addFilter(new CheckServiceDataEmptyAndAutoStopWatchFilter(serviceName)); + + // 2.service change filter + serviceWatchTask.addFilter(new ServiceModifyIndexFilter()); + + // 3.apigateway tag filter:check tag and auto stop watch + serviceWatchTask.addFilter(new CheckTagAndAutoStopWatchFilter(serviceName)); + + // start watch + serviceWatchTask.startWatch(); + + // save + serviceWatchTaskMap.put(serviceName, serviceWatchTask); + } + + public static void stopWatchServiceList() { + if (serviceListWatchTask != null) { + serviceListWatchTask.removeAllFilter(); + serviceListWatchTask.removeAllHandler(); + serviceListWatchTask.stopWatch(); + } + } + + public static void stopWatchService(String serviceName) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("stop " + serviceName + " service watch!"); + } + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + if (watchTask != null) { + watchTask.removeAllFilter(); + watchTask.removeAllHandler(); + watchTask.stopWatch(); + } + serviceWatchTaskMap.remove(serviceName); + } + + public static boolean resetIndex(String serviceName) { + + WatchServiceHealthTask watchTask = serviceWatchTaskMap.get(serviceName); + + if (watchTask != null) { + return watchTask.resetIndex(); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find:" + serviceName); + } + + return false; + } } |