diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java | 239 |
1 files changed, 120 insertions, 119 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java index 90a8477..fb8d9a4 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -33,150 +31,153 @@ import org.slf4j.LoggerFactory; public class ServiceConsumer implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); - private boolean isRunning = true; + private boolean isRunning = true; - private int index; - - - private static final int retryCount=3; + private int index; - //缓存服务信息:key:服务名 和对应的版本列表Set<String> - private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>(); - public ServiceConsumer(final int index) { - this.index = index; - } + private static final int retryCount = 3; + // 缓存服务信息:key:服务名 和对应的版本列表Set<String> + private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>(); - public void run() { + public ServiceConsumer(final int index) { + this.index = index; + } - LOGGER.info("run Service Consumer Thread [" + index + "]"); - while (isRunning) { - try { - ServiceData<List<ServiceHealth>> serviceData; + public void run() { - serviceData = QueueManager.getInstance().takeFromServiceQueue(index); + LOGGER.info("run Service Consumer Thread [" + index + "]"); - // LOGGER.info("Service Consumer Thread [" + index + - // "] take out serviceData from Queue successfully"); + while (isRunning) { + try { + ServiceData<List<ServiceHealth>> serviceData; - if (serviceData.getOperate() == ServiceData.Operate.delete) { - // 删除服务 - deleteMicroService(serviceData); - } else { - // 更新服务 - updateMicroService(serviceData); - } - } catch (InterruptedException e) { - LOGGER.error("ServiceConsumer throw InterruptedException: ", e); - Thread.currentThread().interrupt(); - } + serviceData = QueueManager.getInstance().takeFromServiceQueue(index); - } - } + // LOGGER.info("Service Consumer Thread [" + index + + // "] take out serviceData from Queue successfully"); + if (serviceData.getOperate() == ServiceData.Operate.delete) { + // 删除服务 + deleteMicroService(serviceData); + } else { + // 更新服务 + updateMicroService(serviceData); + } + } catch (InterruptedException e) { + LOGGER.error("ServiceConsumer throw InterruptedException: ", e); + Thread.currentThread().interrupt(); + } + } + } - private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) { - String serviceName = null; - try { - if (serviceData.getData() == null || serviceData.getData().size() == 0) { - throw new Exception("sysn deleteMicroService is wrong:serviceData is empty"); - } - - serviceName = serviceData.getData().get(0).getService().getService(); -// LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] " -// + serviceName); - //ServiceListCache.removeService(serviceName); - MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); - } catch (Exception e) { - LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e); - //删除失败,重试三次 - for(int i=0;i<retryCount;i++){ - + private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) { + String serviceName = null; try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage()); - } - if(reDeleteMicroService(serviceName)){ - LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName); - break; - } - else{ - LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName); + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + throw new Exception("sysn deleteMicroService is wrong:serviceData is empty"); + } + + serviceName = serviceData.getData().get(0).getService().getService(); + // LOGGER.info("Service Consumer [" + index + "] start to delete + // MicroService:[serviceName] " + // + serviceName); + + // ServiceListCache.removeService(serviceName); + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + + } catch (Exception e) { + LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e); + // 删除失败,重试三次 + for (int i = 0; i < retryCount; i++) { + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage()); + } + if (reDeleteMicroService(serviceName)) { + LOGGER.info((i + 1) + "/" + retryCount + " : retry to delete MicroServiceInfo success [serviceName]" + + serviceName); + break; + } else { + LOGGER.error((i + 1) + "/" + retryCount + + " : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName); + } + } } - } } - } - - private boolean reDeleteMicroService(String serviceName){ - try { - MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); - return true; - } catch (Exception e) { - return false; + + private boolean reDeleteMicroService(String serviceName) { + try { + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + return true; + } catch (Exception e) { + return false; + } } - } - private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) { + private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) { - if (serviceData.getData() == null || serviceData.getData().size() == 0) { - LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty "); - return; - } + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty "); + return; + } + + String serviceName = ""; + + try { + + serviceName = serviceData.getData().get(0).getService().getService(); + List<ServiceHealth> serviceNodeList = serviceData.getData(); + + + Map<String, MicroServiceFullInfo> microServiceInfo4version = + ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList); + + // 删除数据库中已不存在的版本号服务信息 + Set<String> newAllVersion = microServiceInfo4version.keySet(); - String serviceName = ""; + if (lastVersionResponse.containsKey(serviceName)) { + Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName); + // Set<String> + // dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName); + Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet); - try { + if (delVersionList.size() > 0) { - serviceName = serviceData.getData().get(0).getService().getService(); - List<ServiceHealth> serviceNodeList = serviceData.getData(); + LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName + "[version]" + + delVersionList); - Map<String, MicroServiceFullInfo> microServiceInfo4version = - ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList); + for (String version : delVersionList) { + MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version); + } - // 删除数据库中已不存在的版本号服务信息 - Set<String> newAllVersion = microServiceInfo4version.keySet(); + } + } - if (lastVersionResponse.containsKey(serviceName)) { - Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName); - // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName); - Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet); + lastVersionResponse.put(serviceName, newAllVersion); - if (delVersionList.size() > 0) { + for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) { + MicroServiceFullInfo new_microServiceFullInfo = entry.getValue(); + MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo); - LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName - + "[version]" + delVersionList); + } - for (String version : delVersionList) { - MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version); - } - + } catch (Exception e) { + LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName); + // 更新失败,重置任务服务的modifyIndex,等待重新更新 + RedisHealthCheck.writeCheckFlag = true; + SyncDataManager.resetIndex(serviceName); } - } - - lastVersionResponse.put(serviceName, newAllVersion); - - for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) { - MicroServiceFullInfo new_microServiceFullInfo = entry.getValue(); - MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo); - - } - - - } catch (Exception e) { - LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName); - //更新失败,重置任务服务的modifyIndex,等待重新更新 - RedisHealthCheck.writeCheckFlag = true; - SyncDataManager.resetIndex(serviceName); } - } } |