diff options
author | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-07-25 15:18:33 +0800 |
---|---|---|
committer | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-07-25 18:11:59 +0800 |
commit | 672f3d40be83d9e380fd7be4b674d5e8d5fa36de (patch) | |
tree | 43105e1d5e2ba8e8accea8648e57e1cf87db3f00 /apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java | |
parent | 41d3db15a8e1a0496f9c2a5e15db2998a32bb9bf (diff) |
Divide the MSB source codes into two repos
Change-Id: Ie76d545b214a8ce5191f215350a623e1529983d9
Issue-id: MSB-5
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java new file mode 100644 index 0000000..29d705f --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java @@ -0,0 +1,167 @@ +package org.onap.msb.apiroute.wrapper.queue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.onap.msb.apiroute.SyncDataManager; +import org.onap.msb.apiroute.api.MicroServiceFullInfo; +import org.onap.msb.apiroute.health.RedisHealthCheck; +import org.onap.msb.apiroute.wrapper.MicroServiceWrapper; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.onap.msb.apiroute.wrapper.util.CommonUtil; +import org.onap.msb.apiroute.wrapper.util.ServiceFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ServiceConsumer implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); + + private boolean isRunning = true; + + private int index; + + + private static final int retryCount=3; + + //缓存服务信息:key:服务名 和对应的版本列表Set<String> + private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>(); + + public ServiceConsumer(final int index) { + this.index = index; + } + + + public void run() { + + LOGGER.info("run Service Consumer Thread [" + index + "]"); + + while (isRunning) { + try { + ServiceData<List<ServiceHealth>> serviceData; + + serviceData = QueueManager.getInstance().takeFromServiceQueue(index); + + // LOGGER.info("Service Consumer Thread [" + index + + // "] take out serviceData from Queue successfully"); + + if (serviceData.getOperate() == ServiceData.Operate.delete) { + // 删除服务 + deleteMicroService(serviceData); + } else { + // 更新服务 + updateMicroService(serviceData); + } + } catch (InterruptedException e) { + LOGGER.error("ServiceConsumer throw InterruptedException: ", e); + Thread.currentThread().interrupt(); + } + + } + } + + + + private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) { + String serviceName = null; + try { + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + throw new Exception("sysn deleteMicroService is wrong:serviceData is empty"); + } + + serviceName = serviceData.getData().get(0).getService().getService(); +// LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] " +// + serviceName); + + //ServiceListCache.removeService(serviceName); + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + + } catch (Exception e) { + LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e); + //删除失败,重试三次 + for(int i=0;i<retryCount;i++){ + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage()); + } + if(reDeleteMicroService(serviceName)){ + LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName); + break; + } + else{ + LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName); + } + } + } + } + + private boolean reDeleteMicroService(String serviceName){ + try { + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + return true; + } catch (Exception e) { + return false; + } + } + + private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) { + + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty "); + return; + } + + String serviceName = ""; + + try { + + serviceName = serviceData.getData().get(0).getService().getService(); + List<ServiceHealth> serviceNodeList = serviceData.getData(); + + + Map<String, MicroServiceFullInfo> microServiceInfo4version = + ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList); + + // 删除数据库中已不存在的版本号服务信息 + Set<String> newAllVersion = microServiceInfo4version.keySet(); + + if (lastVersionResponse.containsKey(serviceName)) { + Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName); + // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName); + Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet); + + if (delVersionList.size() > 0) { + + LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName + + "[version]" + delVersionList); + + + for (String version : delVersionList) { + MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version); + } + + } + } + + lastVersionResponse.put(serviceName, newAllVersion); + + for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) { + MicroServiceFullInfo new_microServiceFullInfo = entry.getValue(); + MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo); + + } + + + } catch (Exception e) { + LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName); + //更新失败,重置任务服务的modifyIndex,等待重新更新 + RedisHealthCheck.writeCheckFlag = true; + SyncDataManager.resetIndex(serviceName); + } + } +} |