/******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.http.HttpEntity; import org.onap.msb.apiroute.SyncDataManager; import org.onap.msb.apiroute.wrapper.MicroServiceWrapper; import org.onap.msb.apiroute.wrapper.util.CommonUtil; import org.onap.msb.apiroute.wrapper.util.ServiceFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; public class ServiceListConsumer implements Runnable { private static final Logger LOGGER = LoggerFactory .getLogger(ServiceListConsumer.class); private boolean isRunning = true; private int index; public ServiceListConsumer() { this.index = 0; } public void run() { LOGGER.info("run ServiceList Consumer Thread [" + index + "]"); while (isRunning) { try { // 取最新一条记录 ServiceData serviceData = QueueManager .getInstance().takeFromServiceListQueue(index); LOGGER.debug("ServiceList Consumer Thread [" + index + "] take out serviceData from Queue successfully"); HttpEntity newValues = serviceData.getData(); Set newServiceNameList = filterServiceList(newValues); if (ServiceListCache.getLatestServiceNamelist().size() == 0) { boolean initSuccess=initServiceList(newServiceNameList); if(initSuccess){ ServiceListCache.setLatestServiceNamelist(newServiceNameList); } } else { updateServiceList(newServiceNameList); ServiceListCache.setLatestServiceNamelist(newServiceNameList); } } catch (Exception e) { LOGGER.error( "ServiceListConsumer throw Exception: ", e); } } } private void startWatchService(String serviceName) { // start to Watch service nodes SyncDataManager.startWatchService(serviceName); } private void updateServiceList(Set newServiceNameList) { Set registerServiceNameList = CommonUtil.getDiffrent( ServiceListCache.getLatestServiceNamelist(), newServiceNameList); if (registerServiceNameList.size() > 0) { LOGGER.info("***need to start Watch Service num from consul :" + registerServiceNameList.size()); for (String serviceName : registerServiceNameList) { startWatchService(serviceName); } } } private boolean initServiceList(Set newServiceNameList) { LOGGER.info("***start to initialize service List when System startup ***"); Set dbServiceNameList = MicroServiceWrapper .getInstance().getAllMicroServiceKey(); if(dbServiceNameList==null){ LOGGER.error("init ServiceList from redis fail "); return false; } // 对比删除redis脏数据 Set delServiceNameList = CommonUtil.getDiffrent( newServiceNameList, dbServiceNameList); LOGGER.info("***need to delete Service num from redis :" + delServiceNameList.size()); for (String serviceName : delServiceNameList) { try { MicroServiceWrapper.getInstance() .deleteMicroService4AllVersion(serviceName); LOGGER.info("delete MicroService success from initialize:[serviceName]" + serviceName); } catch (Exception e) { LOGGER.error( "initialize serviceList :Delete MicroServiceInfo serviceName:" + serviceName + " FAIL : ", e); } } // 启动同步开启监听全部服务列表 LOGGER.info("***need to start Watch Service num from initialize :" + newServiceNameList.size()); for (String serviceName : newServiceNameList) { startWatchService(serviceName); } return true; } /*private ImmutableSet filterServiceList( final Map> serviceList) { if (serviceList == null || serviceList.isEmpty()) { return ImmutableSet.of(); } final ImmutableSet.Builder builder = ImmutableSet.builder(); for (Map.Entry> entry : serviceList.entrySet()) { String key = entry.getKey(); if (key != null && !"consul".equals(key)) { List value = entry.getValue(); if (ServiceFilter.getInstance().isFilterService(value)) { builder.add(key); } } } LOGGER.info("consul all service num:" + serviceList.size()); LOGGER.info("consul filter service num:" + builder.build().size()); return builder.build(); } */ private Set filterServiceList(final HttpEntity serviceList) { if (serviceList == null || serviceList.getContentLength() == 0) { return new HashSet(); } final Set builder = new HashSet(); JsonFactory f = new JsonFactory(); JsonParser jp = null; List tagList = null; int inputServiceNum = 0; try { jp = f.createParser(serviceList.getContent()); jp.nextToken(); while (jp.nextToken() != JsonToken.END_OBJECT) { String serviceName = jp.getCurrentName(); inputServiceNum++; jp.nextToken(); tagList = new ArrayList<>(); while (jp.nextToken() != JsonToken.END_ARRAY) { tagList.add(jp.getText()); } if (serviceName != null && !"consul".equals(serviceName)) { if (ServiceFilter.getInstance().isFilterService(tagList)) { builder.add(serviceName); } } } } catch (IOException e) { LOGGER.warn("parse service list error",e); return new HashSet(); } finally { try { jp.close(); } catch (IOException e) { LOGGER.warn("parse service list error",e); return new HashSet(); } } int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size(); // if(latestServiceNum!=builder.size()){ LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum); // } return builder; } }