aboutsummaryrefslogtreecommitdiffstats
path: root/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java')
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java209
1 files changed, 209 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java
new file mode 100644
index 0000000..e3a8aa7
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java
@@ -0,0 +1,209 @@
+package org.onap.msb.apiroute.wrapper.queue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.http.HttpEntity;
+import org.onap.msb.apiroute.SyncDataManager;
+import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
+import org.onap.msb.apiroute.wrapper.util.CommonUtil;
+import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+
+public class ServiceListConsumer implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(ServiceListConsumer.class);
+
+ private boolean isRunning = true;
+
+ private int index;
+
+
+ public ServiceListConsumer() {
+ this.index = 0;
+ }
+
+ public void run() {
+ LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
+
+ while (isRunning) {
+ try {
+ // 取最新一条记录
+ ServiceData<HttpEntity> serviceData = QueueManager
+ .getInstance().takeFromServiceListQueue(index);
+ LOGGER.debug("ServiceList Consumer Thread [" + index +
+ "] take out serviceData from Queue successfully");
+
+ HttpEntity newValues = serviceData.getData();
+
+ Set<String> newServiceNameList = filterServiceList(newValues);
+
+ if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
+ boolean initSuccess=initServiceList(newServiceNameList);
+ if(initSuccess){
+ ServiceListCache.setLatestServiceNamelist(newServiceNameList);
+ }
+ } else {
+ updateServiceList(newServiceNameList);
+ ServiceListCache.setLatestServiceNamelist(newServiceNameList);
+ }
+
+
+ } catch (Exception e) {
+ LOGGER.error(
+ "ServiceListConsumer throw Exception: ", e);
+ }
+ }
+ }
+
+ private void startWatchService(String serviceName) {
+ // start to Watch service nodes
+
+ SyncDataManager.startWatchService(serviceName);
+ }
+
+ private void updateServiceList(Set<String> newServiceNameList) {
+ Set<String> registerServiceNameList = CommonUtil.getDiffrent(
+ ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
+
+ if (registerServiceNameList.size() > 0) {
+ LOGGER.info("***need to start Watch Service num from consul :"
+ + registerServiceNameList.size());
+
+ for (String serviceName : registerServiceNameList) {
+ startWatchService(serviceName);
+ }
+ }
+ }
+
+ private boolean initServiceList(Set<String> newServiceNameList) {
+ LOGGER.info("***start to initialize service List when System startup ***");
+
+ Set<String> dbServiceNameList = MicroServiceWrapper
+ .getInstance().getAllMicroServiceKey();
+
+ if(dbServiceNameList==null){
+ LOGGER.error("init ServiceList from redis fail ");
+ return false;
+ }
+
+
+ // 对比删除redis脏数据
+ Set<String> delServiceNameList = CommonUtil.getDiffrent(
+ newServiceNameList, dbServiceNameList);
+
+ LOGGER.info("***need to delete Service num from redis :"
+ + delServiceNameList.size());
+ for (String serviceName : delServiceNameList) {
+ try {
+ MicroServiceWrapper.getInstance()
+ .deleteMicroService4AllVersion(serviceName);
+ LOGGER.info("delete MicroService success from initialize:[serviceName]"
+ + serviceName);
+
+ } catch (Exception e) {
+ LOGGER.error(
+ "initialize serviceList :Delete MicroServiceInfo serviceName:"
+ + serviceName + " FAIL : ", e);
+ }
+ }
+
+ // 启动同步开启监听全部服务列表
+ LOGGER.info("***need to start Watch Service num from initialize :"
+ + newServiceNameList.size());
+
+ for (String serviceName : newServiceNameList) {
+ startWatchService(serviceName);
+ }
+
+ return true;
+
+ }
+
+ /*private ImmutableSet<String> filterServiceList(
+ final Map<String, List<String>> serviceList) {
+ if (serviceList == null || serviceList.isEmpty()) {
+ return ImmutableSet.of();
+ }
+
+ final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+
+ for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
+
+ String key = entry.getKey();
+ if (key != null && !"consul".equals(key)) {
+
+ List<String> value = entry.getValue();
+ if (ServiceFilter.getInstance().isFilterService(value)) {
+ builder.add(key);
+ }
+ }
+ }
+
+ LOGGER.info("consul all service num:" + serviceList.size());
+ LOGGER.info("consul filter service num:" + builder.build().size());
+
+ return builder.build();
+ }
+*/
+ private Set<String> filterServiceList(final HttpEntity serviceList) {
+
+ if (serviceList == null || serviceList.getContentLength() == 0) {
+ return new HashSet<String>();
+ }
+
+ final Set<String> builder = new HashSet<String>();
+
+ JsonFactory f = new JsonFactory();
+ JsonParser jp = null;
+ List<String> tagList = null;
+ int inputServiceNum = 0;
+ try {
+ jp = f.createParser(serviceList.getContent());
+ jp.nextToken();
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String serviceName = jp.getCurrentName();
+ inputServiceNum++;
+ jp.nextToken();
+ tagList = new ArrayList<>();
+ while (jp.nextToken() != JsonToken.END_ARRAY) {
+ tagList.add(jp.getText());
+ }
+
+ if (serviceName != null && !"consul".equals(serviceName)) {
+ if (ServiceFilter.getInstance().isFilterService(tagList)) {
+ builder.add(serviceName);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("parse service list error",e);
+ return new HashSet<String>();
+ } finally {
+ try {
+ jp.close();
+ } catch (IOException e) {
+ LOGGER.warn("parse service list error",e);
+ return new HashSet<String>();
+ }
+ }
+
+ int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size();
+// if(latestServiceNum!=builder.size()){
+ LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum);
+// }
+
+ return builder;
+ }
+
+}