diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java new file mode 100644 index 0000000..e3a8aa7 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java @@ -0,0 +1,209 @@ +package org.onap.msb.apiroute.wrapper.queue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.http.HttpEntity; +import org.onap.msb.apiroute.SyncDataManager; +import org.onap.msb.apiroute.wrapper.MicroServiceWrapper; +import org.onap.msb.apiroute.wrapper.util.CommonUtil; +import org.onap.msb.apiroute.wrapper.util.ServiceFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + + +public class ServiceListConsumer implements Runnable { + + private static final Logger LOGGER = LoggerFactory + .getLogger(ServiceListConsumer.class); + + private boolean isRunning = true; + + private int index; + + + public ServiceListConsumer() { + this.index = 0; + } + + public void run() { + LOGGER.info("run ServiceList Consumer Thread [" + index + "]"); + + while (isRunning) { + try { + // 取最新一条记录 + ServiceData<HttpEntity> serviceData = QueueManager + .getInstance().takeFromServiceListQueue(index); + LOGGER.debug("ServiceList Consumer Thread [" + index + + "] take out serviceData from Queue successfully"); + + HttpEntity newValues = serviceData.getData(); + + Set<String> newServiceNameList = filterServiceList(newValues); + + if (ServiceListCache.getLatestServiceNamelist().size() == 0) { + boolean initSuccess=initServiceList(newServiceNameList); + if(initSuccess){ + ServiceListCache.setLatestServiceNamelist(newServiceNameList); + } + } else { + updateServiceList(newServiceNameList); + ServiceListCache.setLatestServiceNamelist(newServiceNameList); + } + + + } catch (Exception e) { + LOGGER.error( + "ServiceListConsumer throw Exception: ", e); + } + } + } + + private void startWatchService(String serviceName) { + // start to Watch service nodes + + SyncDataManager.startWatchService(serviceName); + } + + private void updateServiceList(Set<String> newServiceNameList) { + Set<String> registerServiceNameList = CommonUtil.getDiffrent( + ServiceListCache.getLatestServiceNamelist(), newServiceNameList); + + if (registerServiceNameList.size() > 0) { + LOGGER.info("***need to start Watch Service num from consul :" + + registerServiceNameList.size()); + + for (String serviceName : registerServiceNameList) { + startWatchService(serviceName); + } + } + } + + private boolean initServiceList(Set<String> newServiceNameList) { + LOGGER.info("***start to initialize service List when System startup ***"); + + Set<String> dbServiceNameList = MicroServiceWrapper + .getInstance().getAllMicroServiceKey(); + + if(dbServiceNameList==null){ + LOGGER.error("init ServiceList from redis fail "); + return false; + } + + + // 对比删除redis脏数据 + Set<String> delServiceNameList = CommonUtil.getDiffrent( + newServiceNameList, dbServiceNameList); + + LOGGER.info("***need to delete Service num from redis :" + + delServiceNameList.size()); + for (String serviceName : delServiceNameList) { + try { + MicroServiceWrapper.getInstance() + .deleteMicroService4AllVersion(serviceName); + LOGGER.info("delete MicroService success from initialize:[serviceName]" + + serviceName); + + } catch (Exception e) { + LOGGER.error( + "initialize serviceList :Delete MicroServiceInfo serviceName:" + + serviceName + " FAIL : ", e); + } + } + + // 启动同步开启监听全部服务列表 + LOGGER.info("***need to start Watch Service num from initialize :" + + newServiceNameList.size()); + + for (String serviceName : newServiceNameList) { + startWatchService(serviceName); + } + + return true; + + } + + /*private ImmutableSet<String> filterServiceList( + final Map<String, List<String>> serviceList) { + if (serviceList == null || serviceList.isEmpty()) { + return ImmutableSet.of(); + } + + final ImmutableSet.Builder<String> builder = ImmutableSet.builder(); + + for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) { + + String key = entry.getKey(); + if (key != null && !"consul".equals(key)) { + + List<String> value = entry.getValue(); + if (ServiceFilter.getInstance().isFilterService(value)) { + builder.add(key); + } + } + } + + LOGGER.info("consul all service num:" + serviceList.size()); + LOGGER.info("consul filter service num:" + builder.build().size()); + + return builder.build(); + } +*/ + private Set<String> filterServiceList(final HttpEntity serviceList) { + + if (serviceList == null || serviceList.getContentLength() == 0) { + return new HashSet<String>(); + } + + final Set<String> builder = new HashSet<String>(); + + JsonFactory f = new JsonFactory(); + JsonParser jp = null; + List<String> tagList = null; + int inputServiceNum = 0; + try { + jp = f.createParser(serviceList.getContent()); + jp.nextToken(); + while (jp.nextToken() != JsonToken.END_OBJECT) { + String serviceName = jp.getCurrentName(); + inputServiceNum++; + jp.nextToken(); + tagList = new ArrayList<>(); + while (jp.nextToken() != JsonToken.END_ARRAY) { + tagList.add(jp.getText()); + } + + if (serviceName != null && !"consul".equals(serviceName)) { + if (ServiceFilter.getInstance().isFilterService(tagList)) { + builder.add(serviceName); + } + } + } + } catch (IOException e) { + LOGGER.warn("parse service list error",e); + return new HashSet<String>(); + } finally { + try { + jp.close(); + } catch (IOException e) { + LOGGER.warn("parse service list error",e); + return new HashSet<String>(); + } + } + + int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size(); +// if(latestServiceNum!=builder.size()){ + LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum); +// } + + return builder; + } + +} |