diff options
author | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-09-07 14:33:18 +0800 |
---|---|---|
committer | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-09-07 14:40:59 +0800 |
commit | e75a8ef2372722c0b22669fb427d47bacc5b8d5e (patch) | |
tree | cee85cbc4fe818262fb8c4d733f2fac0c8024df8 /apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue | |
parent | e5fe5a022f4cc5164c1f4516c024617c49f12978 (diff) |
Fix java check style warning
Change-Id: I98a6d7237a213d007ad4d954989cb0b0fa150a10
Issue-Id: MSB-67
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue')
8 files changed, 523 insertions, 563 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/BaseQueue.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/BaseQueue.java index 4d77204..f6d7bba 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/BaseQueue.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/BaseQueue.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -21,31 +19,27 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public abstract class BaseQueue<T> { - - private final List<BlockingQueue<ServiceData<T>>> queueArray= new ArrayList<BlockingQueue<ServiceData<T>>>(); - - public BaseQueue(int queueNum,int queueCapacity) - { - for(int i=0;queueNum>0 && i<queueNum;i++) - { - queueArray.add(new LinkedBlockingQueue<ServiceData<T>>(queueCapacity)); - } - } - - public int getQueneNum(){ - return queueArray.size(); - } - - protected BlockingQueue<ServiceData<T>> getQueue(int index) - { - return queueArray.get(index); - } - - public abstract void put(final ServiceData<T> data) throws InterruptedException; - - public abstract ServiceData<T> take(final int queueIndex) throws InterruptedException; - - - + + private final List<BlockingQueue<ServiceData<T>>> queueArray = new ArrayList<BlockingQueue<ServiceData<T>>>(); + + public BaseQueue(int queueNum, int queueCapacity) { + for (int i = 0; queueNum > 0 && i < queueNum; i++) { + queueArray.add(new LinkedBlockingQueue<ServiceData<T>>(queueCapacity)); + } + } + + public int getQueneNum() { + return queueArray.size(); + } + + protected BlockingQueue<ServiceData<T>> getQueue(int index) { + return queueArray.get(index); + } + + public abstract void put(final ServiceData<T> data) throws InterruptedException; + + public abstract ServiceData<T> take(final int queueIndex) throws InterruptedException; + + } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/QueueManager.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/QueueManager.java index f959032..ab492b2 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/QueueManager.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/QueueManager.java @@ -1,22 +1,19 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; import java.util.List; -import java.util.Map; import org.apache.http.HttpEntity; import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; @@ -28,53 +25,48 @@ import org.slf4j.LoggerFactory; public class QueueManager { - private static final Logger LOGGER = LoggerFactory - .getLogger(QueueManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(QueueManager.class); - private final BaseQueue<HttpEntity> serviceListQueue; - private final BaseQueue<List<ServiceHealth>> serviceQueue; + private final BaseQueue<HttpEntity> serviceListQueue; + private final BaseQueue<List<ServiceHealth>> serviceQueue; - private volatile static QueueManager instance = null; + private volatile static QueueManager instance = null; - public static QueueManager getInstance() { - if (instance == null) { - synchronized (QueueManager.class) { - if (instance == null) { - instance = new QueueManager(); - } - } - } - return instance; - } + public static QueueManager getInstance() { + if (instance == null) { + synchronized (QueueManager.class) { + if (instance == null) { + instance = new QueueManager(); + } + } + } + return instance; + } - private QueueManager() { - serviceListQueue = new ServiceListQueue( - RouteUtil.SERVICE_LIST_QUEUE_CAPACITY); - serviceQueue = new ServiceQueue(RouteUtil.SERVICE_DATA_QUEUE_NUM, - RouteUtil.SERVICE_QUEUE_CAPACITY); - } + private QueueManager() { + serviceListQueue = new ServiceListQueue(RouteUtil.SERVICE_LIST_QUEUE_CAPACITY); + serviceQueue = new ServiceQueue(RouteUtil.SERVICE_DATA_QUEUE_NUM, RouteUtil.SERVICE_QUEUE_CAPACITY); + } - public ServiceData<HttpEntity> takeFromServiceListQueue( - int queueIndex) throws InterruptedException { - return serviceListQueue.take(queueIndex); - } + public ServiceData<HttpEntity> takeFromServiceListQueue(int queueIndex) throws InterruptedException { + return serviceListQueue.take(queueIndex); + } - public ServiceData<List<ServiceHealth>> takeFromServiceQueue(int queueIndex) - throws InterruptedException { - return serviceQueue.take(queueIndex); - } + public ServiceData<List<ServiceHealth>> takeFromServiceQueue(int queueIndex) throws InterruptedException { + return serviceQueue.take(queueIndex); + } - @SuppressWarnings("unchecked") - public <T> void putIn(ServiceData<T> data) throws InterruptedException { + @SuppressWarnings("unchecked") + public <T> void putIn(ServiceData<T> data) throws InterruptedException { - if (data.getDataType() == ServiceData.DataType.service_list) { - LOGGER.debug("putIn service_list queue success"); - serviceListQueue.put((ServiceData<HttpEntity>) data); - } else if (data.getDataType() == ServiceData.DataType.service) { - serviceQueue.put((ServiceData<List<ServiceHealth>>) data); - } else { - LOGGER.warn("DATA TYPE NOT SUPPORT:"+data.getDataType()); - } - } + if (data.getDataType() == ServiceData.DataType.service_list) { + LOGGER.debug("putIn service_list queue success"); + serviceListQueue.put((ServiceData<HttpEntity>) data); + } else if (data.getDataType() == ServiceData.DataType.service) { + serviceQueue.put((ServiceData<List<ServiceHealth>>) data); + } else { + LOGGER.warn("DATA TYPE NOT SUPPORT:" + data.getDataType()); + } + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java index 90a8477..fb8d9a4 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceConsumer.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -33,150 +31,153 @@ import org.slf4j.LoggerFactory; public class ServiceConsumer implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); - private boolean isRunning = true; + private boolean isRunning = true; - private int index; - - - private static final int retryCount=3; + private int index; - //缓存服务信息:key:服务名 和对应的版本列表Set<String> - private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>(); - public ServiceConsumer(final int index) { - this.index = index; - } + private static final int retryCount = 3; + // 缓存服务信息:key:服务名 和对应的版本列表Set<String> + private final Map<String, Set<String>> lastVersionResponse = new HashMap<String, Set<String>>(); - public void run() { + public ServiceConsumer(final int index) { + this.index = index; + } - LOGGER.info("run Service Consumer Thread [" + index + "]"); - while (isRunning) { - try { - ServiceData<List<ServiceHealth>> serviceData; + public void run() { - serviceData = QueueManager.getInstance().takeFromServiceQueue(index); + LOGGER.info("run Service Consumer Thread [" + index + "]"); - // LOGGER.info("Service Consumer Thread [" + index + - // "] take out serviceData from Queue successfully"); + while (isRunning) { + try { + ServiceData<List<ServiceHealth>> serviceData; - if (serviceData.getOperate() == ServiceData.Operate.delete) { - // 删除服务 - deleteMicroService(serviceData); - } else { - // 更新服务 - updateMicroService(serviceData); - } - } catch (InterruptedException e) { - LOGGER.error("ServiceConsumer throw InterruptedException: ", e); - Thread.currentThread().interrupt(); - } + serviceData = QueueManager.getInstance().takeFromServiceQueue(index); - } - } + // LOGGER.info("Service Consumer Thread [" + index + + // "] take out serviceData from Queue successfully"); + if (serviceData.getOperate() == ServiceData.Operate.delete) { + // 删除服务 + deleteMicroService(serviceData); + } else { + // 更新服务 + updateMicroService(serviceData); + } + } catch (InterruptedException e) { + LOGGER.error("ServiceConsumer throw InterruptedException: ", e); + Thread.currentThread().interrupt(); + } + } + } - private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) { - String serviceName = null; - try { - if (serviceData.getData() == null || serviceData.getData().size() == 0) { - throw new Exception("sysn deleteMicroService is wrong:serviceData is empty"); - } - - serviceName = serviceData.getData().get(0).getService().getService(); -// LOGGER.info("Service Consumer [" + index + "] start to delete MicroService:[serviceName] " -// + serviceName); - //ServiceListCache.removeService(serviceName); - MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); - } catch (Exception e) { - LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e); - //删除失败,重试三次 - for(int i=0;i<retryCount;i++){ - + private void deleteMicroService(ServiceData<List<ServiceHealth>> serviceData) { + String serviceName = null; try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage()); - } - if(reDeleteMicroService(serviceName)){ - LOGGER.info((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo success [serviceName]" + serviceName); - break; - } - else{ - LOGGER.error((i+1) + "/"+retryCount+" : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName); + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + throw new Exception("sysn deleteMicroService is wrong:serviceData is empty"); + } + + serviceName = serviceData.getData().get(0).getService().getService(); + // LOGGER.info("Service Consumer [" + index + "] start to delete + // MicroService:[serviceName] " + // + serviceName); + + // ServiceListCache.removeService(serviceName); + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + + } catch (Exception e) { + LOGGER.error("delete MicroServiceInfo 4AllVersion fail from consul:[serviceName]" + serviceName, e); + // 删除失败,重试三次 + for (int i = 0; i < retryCount; i++) { + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOGGER.error("delete MicroServiceInfo 4AllVersion Thread.sleep throw except:" + ex.getMessage()); + } + if (reDeleteMicroService(serviceName)) { + LOGGER.info((i + 1) + "/" + retryCount + " : retry to delete MicroServiceInfo success [serviceName]" + + serviceName); + break; + } else { + LOGGER.error((i + 1) + "/" + retryCount + + " : retry to delete MicroServiceInfo still fail [serviceName]" + serviceName); + } + } } - } } - } - - private boolean reDeleteMicroService(String serviceName){ - try { - MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); - return true; - } catch (Exception e) { - return false; + + private boolean reDeleteMicroService(String serviceName) { + try { + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + return true; + } catch (Exception e) { + return false; + } } - } - private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) { + private void updateMicroService(ServiceData<List<ServiceHealth>> serviceData) { - if (serviceData.getData() == null || serviceData.getData().size() == 0) { - LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty "); - return; - } + if (serviceData.getData() == null || serviceData.getData().size() == 0) { + LOGGER.warn("sysn updateMicroService is wrong:serviceData is empty "); + return; + } + + String serviceName = ""; + + try { + + serviceName = serviceData.getData().get(0).getService().getService(); + List<ServiceHealth> serviceNodeList = serviceData.getData(); + + + Map<String, MicroServiceFullInfo> microServiceInfo4version = + ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList); + + // 删除数据库中已不存在的版本号服务信息 + Set<String> newAllVersion = microServiceInfo4version.keySet(); - String serviceName = ""; + if (lastVersionResponse.containsKey(serviceName)) { + Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName); + // Set<String> + // dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName); + Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet); - try { + if (delVersionList.size() > 0) { - serviceName = serviceData.getData().get(0).getService().getService(); - List<ServiceHealth> serviceNodeList = serviceData.getData(); + LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName + "[version]" + + delVersionList); - Map<String, MicroServiceFullInfo> microServiceInfo4version = - ServiceFilter.getInstance().transMicroServiceInfoFromConsul(serviceNodeList); + for (String version : delVersionList) { + MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version); + } - // 删除数据库中已不存在的版本号服务信息 - Set<String> newAllVersion = microServiceInfo4version.keySet(); + } + } - if (lastVersionResponse.containsKey(serviceName)) { - Set<String> dbAllVersionSet = lastVersionResponse.get(serviceName); - // Set<String> dbAllVersionSet=MicroServiceWrapper.getInstance().getAllVersion(serviceName); - Set<String> delVersionList = CommonUtil.getDiffrent(newAllVersion, dbAllVersionSet); + lastVersionResponse.put(serviceName, newAllVersion); - if (delVersionList.size() > 0) { + for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) { + MicroServiceFullInfo new_microServiceFullInfo = entry.getValue(); + MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo); - LOGGER.info("MicroService version is change from consul:[serviceName]" + serviceName - + "[version]" + delVersionList); + } - for (String version : delVersionList) { - MicroServiceWrapper.getInstance().deleteMicroService(serviceName, version); - } - + } catch (Exception e) { + LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName); + // 更新失败,重置任务服务的modifyIndex,等待重新更新 + RedisHealthCheck.writeCheckFlag = true; + SyncDataManager.resetIndex(serviceName); } - } - - lastVersionResponse.put(serviceName, newAllVersion); - - for (Map.Entry<String, MicroServiceFullInfo> entry : microServiceInfo4version.entrySet()) { - MicroServiceFullInfo new_microServiceFullInfo = entry.getValue(); - MicroServiceWrapper.getInstance().saveServiceAndnoticeRoute(new_microServiceFullInfo); - - } - - - } catch (Exception e) { - LOGGER.error("update MicroServiceInfo fail from consul:[serviceName]" + serviceName); - //更新失败,重置任务服务的modifyIndex,等待重新更新 - RedisHealthCheck.writeCheckFlag = true; - SyncDataManager.resetIndex(serviceName); } - } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceData.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceData.java index be77603..90e8048 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceData.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceData.java @@ -1,68 +1,66 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; public class ServiceData<T> { - public static enum Type { - consul - }; + public static enum Type { + consul + }; - public static enum Operate { - update, delete - }; + public static enum Operate { + update, delete + }; - public static enum DataType { - service_list, service - } + public static enum DataType { + service_list, service + } - private Type type = Type.consul; - private DataType dataType; - private T data; - private Operate operate = Operate.update; + private Type type = Type.consul; + private DataType dataType; + private T data; + private Operate operate = Operate.update; - public Type getType() { - return type; - } + public Type getType() { + return type; + } - public void setType(Type type) { - this.type = type; - } + public void setType(Type type) { + this.type = type; + } - public DataType getDataType() { - return dataType; - } + public DataType getDataType() { + return dataType; + } - public void setDataType(DataType dataType) { - this.dataType = dataType; - } + public void setDataType(DataType dataType) { + this.dataType = dataType; + } - public T getData() { - return data; - } + public T getData() { + return data; + } - public void setData(T data) { - this.data = data; - } + public void setData(T data) { + this.data = data; + } - public Operate getOperate() { - return operate; - } + public Operate getOperate() { + return operate; + } - public void setOperate(Operate operate) { - this.operate = operate; - } + public void setOperate(Operate operate) { + this.operate = operate; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListCache.java index 5f178d9..9d3ebbd 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -24,26 +22,27 @@ import org.slf4j.LoggerFactory; public class ServiceListCache { - - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListCache.class); - - private final static AtomicReference<Set<String>> serviceNameList4Cache = new AtomicReference<Set<String>>(new HashSet<String>()); - - public static Set<String> getLatestServiceNamelist() { - return serviceNameList4Cache.get(); - } - - public static void setLatestServiceNamelist(Set<String> newServicenamelist){ - serviceNameList4Cache.set(newServicenamelist); - LOGGER.info("------current total Watch Service Num :"+ newServicenamelist.size()); - } - - public synchronized static void removeService(String serviceName){ - - Set<String> servicenamelist=serviceNameList4Cache.get(); - servicenamelist.remove(serviceName); - serviceNameList4Cache.set(servicenamelist); - LOGGER.info("------current total Watch Service Num :"+ servicenamelist.size()); + + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListCache.class); + + private final static AtomicReference<Set<String>> serviceNameList4Cache = + new AtomicReference<Set<String>>(new HashSet<String>()); + + public static Set<String> getLatestServiceNamelist() { + return serviceNameList4Cache.get(); + } + + public static void setLatestServiceNamelist(Set<String> newServicenamelist) { + serviceNameList4Cache.set(newServicenamelist); + LOGGER.info("------current total Watch Service Num :" + newServicenamelist.size()); + } + + public synchronized static void removeService(String serviceName) { + + Set<String> servicenamelist = serviceNameList4Cache.get(); + servicenamelist.remove(serviceName); + serviceNameList4Cache.set(servicenamelist); + LOGGER.info("------current total Watch Service Num :" + servicenamelist.size()); } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java index 617a4e5..c673d78 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListConsumer.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -36,189 +34,171 @@ import com.fasterxml.jackson.core.JsonToken; public class ServiceListConsumer implements Runnable { - private static final Logger LOGGER = LoggerFactory - .getLogger(ServiceListConsumer.class); - - private boolean isRunning = true; - - private int index; - - - public ServiceListConsumer() { - this.index = 0; - } - - public void run() { - LOGGER.info("run ServiceList Consumer Thread [" + index + "]"); - - while (isRunning) { - try { - // 取最新一条记录 - ServiceData<HttpEntity> serviceData = QueueManager - .getInstance().takeFromServiceListQueue(index); - LOGGER.debug("ServiceList Consumer Thread [" + index + - "] take out serviceData from Queue successfully"); - - HttpEntity newValues = serviceData.getData(); - - Set<String> newServiceNameList = filterServiceList(newValues); - - if (ServiceListCache.getLatestServiceNamelist().size() == 0) { - boolean initSuccess=initServiceList(newServiceNameList); - if(initSuccess){ - ServiceListCache.setLatestServiceNamelist(newServiceNameList); - } - } else { - updateServiceList(newServiceNameList); - ServiceListCache.setLatestServiceNamelist(newServiceNameList); - } - - - } catch (Exception e) { - LOGGER.error( - "ServiceListConsumer throw Exception: ", e); - } - } - } - - private void startWatchService(String serviceName) { - // start to Watch service nodes - - SyncDataManager.startWatchService(serviceName); - } - - private void updateServiceList(Set<String> newServiceNameList) { - Set<String> registerServiceNameList = CommonUtil.getDiffrent( - ServiceListCache.getLatestServiceNamelist(), newServiceNameList); - - if (registerServiceNameList.size() > 0) { - LOGGER.info("***need to start Watch Service num from consul :" - + registerServiceNameList.size()); - - for (String serviceName : registerServiceNameList) { - startWatchService(serviceName); - } - } - } - - private boolean initServiceList(Set<String> newServiceNameList) { - LOGGER.info("***start to initialize service List when System startup ***"); - - Set<String> dbServiceNameList = MicroServiceWrapper - .getInstance().getAllMicroServiceKey(); - - if(dbServiceNameList==null){ - LOGGER.error("init ServiceList from redis fail "); - return false; - } - - - // 对比删除redis脏数据 - Set<String> delServiceNameList = CommonUtil.getDiffrent( - newServiceNameList, dbServiceNameList); - - LOGGER.info("***need to delete Service num from redis :" - + delServiceNameList.size()); - for (String serviceName : delServiceNameList) { - try { - MicroServiceWrapper.getInstance() - .deleteMicroService4AllVersion(serviceName); - LOGGER.info("delete MicroService success from initialize:[serviceName]" - + serviceName); - - } catch (Exception e) { - LOGGER.error( - "initialize serviceList :Delete MicroServiceInfo serviceName:" - + serviceName + " FAIL : ", e); - } - } - - // 启动同步开启监听全部服务列表 - LOGGER.info("***need to start Watch Service num from initialize :" - + newServiceNameList.size()); - - for (String serviceName : newServiceNameList) { - startWatchService(serviceName); - } - - return true; - - } - - /*private ImmutableSet<String> filterServiceList( - final Map<String, List<String>> serviceList) { - if (serviceList == null || serviceList.isEmpty()) { - return ImmutableSet.of(); - } - - final ImmutableSet.Builder<String> builder = ImmutableSet.builder(); - - for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) { - - String key = entry.getKey(); - if (key != null && !"consul".equals(key)) { - - List<String> value = entry.getValue(); - if (ServiceFilter.getInstance().isFilterService(value)) { - builder.add(key); - } - } - } - - LOGGER.info("consul all service num:" + serviceList.size()); - LOGGER.info("consul filter service num:" + builder.build().size()); - - return builder.build(); - } -*/ - private Set<String> filterServiceList(final HttpEntity serviceList) { - - if (serviceList == null || serviceList.getContentLength() == 0) { - return new HashSet<String>(); - } - - final Set<String> builder = new HashSet<String>(); - - JsonFactory f = new JsonFactory(); - JsonParser jp = null; - List<String> tagList = null; - int inputServiceNum = 0; - try { - jp = f.createParser(serviceList.getContent()); - jp.nextToken(); - while (jp.nextToken() != JsonToken.END_OBJECT) { - String serviceName = jp.getCurrentName(); - inputServiceNum++; - jp.nextToken(); - tagList = new ArrayList<>(); - while (jp.nextToken() != JsonToken.END_ARRAY) { - tagList.add(jp.getText()); - } - - if (serviceName != null && !"consul".equals(serviceName)) { - if (ServiceFilter.getInstance().isFilterService(tagList)) { - builder.add(serviceName); - } - } - } - } catch (IOException e) { - LOGGER.warn("parse service list error",e); - return new HashSet<String>(); - } finally { - try { - jp.close(); - } catch (IOException e) { - LOGGER.warn("parse service list error",e); - return new HashSet<String>(); - } - } - - int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size(); -// if(latestServiceNum!=builder.size()){ - LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum); -// } - - return builder; - } + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListConsumer.class); + + private boolean isRunning = true; + + private int index; + + + public ServiceListConsumer() { + this.index = 0; + } + + public void run() { + LOGGER.info("run ServiceList Consumer Thread [" + index + "]"); + + while (isRunning) { + try { + // 取最新一条记录 + ServiceData<HttpEntity> serviceData = QueueManager.getInstance().takeFromServiceListQueue(index); + LOGGER.debug("ServiceList Consumer Thread [" + index + + "] take out serviceData from Queue successfully"); + + HttpEntity newValues = serviceData.getData(); + + Set<String> newServiceNameList = filterServiceList(newValues); + + if (ServiceListCache.getLatestServiceNamelist().size() == 0) { + boolean initSuccess = initServiceList(newServiceNameList); + if (initSuccess) { + ServiceListCache.setLatestServiceNamelist(newServiceNameList); + } + } else { + updateServiceList(newServiceNameList); + ServiceListCache.setLatestServiceNamelist(newServiceNameList); + } + + + } catch (Exception e) { + LOGGER.error("ServiceListConsumer throw Exception: ", e); + } + } + } + + private void startWatchService(String serviceName) { + // start to Watch service nodes + + SyncDataManager.startWatchService(serviceName); + } + + private void updateServiceList(Set<String> newServiceNameList) { + Set<String> registerServiceNameList = + CommonUtil.getDiffrent(ServiceListCache.getLatestServiceNamelist(), newServiceNameList); + + if (registerServiceNameList.size() > 0) { + LOGGER.info("***need to start Watch Service num from consul :" + registerServiceNameList.size()); + + for (String serviceName : registerServiceNameList) { + startWatchService(serviceName); + } + } + } + + private boolean initServiceList(Set<String> newServiceNameList) { + LOGGER.info("***start to initialize service List when System startup ***"); + + Set<String> dbServiceNameList = MicroServiceWrapper.getInstance().getAllMicroServiceKey(); + + if (dbServiceNameList == null) { + LOGGER.error("init ServiceList from redis fail "); + return false; + } + + + // 对比删除redis脏数据 + Set<String> delServiceNameList = CommonUtil.getDiffrent(newServiceNameList, dbServiceNameList); + + LOGGER.info("***need to delete Service num from redis :" + delServiceNameList.size()); + for (String serviceName : delServiceNameList) { + try { + MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName); + LOGGER.info("delete MicroService success from initialize:[serviceName]" + serviceName); + + } catch (Exception e) { + LOGGER.error("initialize serviceList :Delete MicroServiceInfo serviceName:" + serviceName + " FAIL : ", + e); + } + } + + // 启动同步开启监听全部服务列表 + LOGGER.info("***need to start Watch Service num from initialize :" + newServiceNameList.size()); + + for (String serviceName : newServiceNameList) { + startWatchService(serviceName); + } + + return true; + + } + + /* + * private ImmutableSet<String> filterServiceList( final Map<String, List<String>> serviceList) + * { if (serviceList == null || serviceList.isEmpty()) { return ImmutableSet.of(); } + * + * final ImmutableSet.Builder<String> builder = ImmutableSet.builder(); + * + * for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) { + * + * String key = entry.getKey(); if (key != null && !"consul".equals(key)) { + * + * List<String> value = entry.getValue(); if + * (ServiceFilter.getInstance().isFilterService(value)) { builder.add(key); } } } + * + * LOGGER.info("consul all service num:" + serviceList.size()); + * LOGGER.info("consul filter service num:" + builder.build().size()); + * + * return builder.build(); } + */ + private Set<String> filterServiceList(final HttpEntity serviceList) { + + if (serviceList == null || serviceList.getContentLength() == 0) { + return new HashSet<String>(); + } + + final Set<String> builder = new HashSet<String>(); + + JsonFactory f = new JsonFactory(); + JsonParser jp = null; + List<String> tagList = null; + int inputServiceNum = 0; + try { + jp = f.createParser(serviceList.getContent()); + jp.nextToken(); + while (jp.nextToken() != JsonToken.END_OBJECT) { + String serviceName = jp.getCurrentName(); + inputServiceNum++; + jp.nextToken(); + tagList = new ArrayList<>(); + while (jp.nextToken() != JsonToken.END_ARRAY) { + tagList.add(jp.getText()); + } + + if (serviceName != null && !"consul".equals(serviceName)) { + if (ServiceFilter.getInstance().isFilterService(tagList)) { + builder.add(serviceName); + } + } + } + } catch (IOException e) { + LOGGER.warn("parse service list error", e); + return new HashSet<String>(); + } finally { + try { + jp.close(); + } catch (IOException e) { + LOGGER.warn("parse service list error", e); + return new HashSet<String>(); + } + } + + int latestServiceNum = ServiceListCache.getLatestServiceNamelist().size(); + // if(latestServiceNum!=builder.size()){ + LOGGER.info("[consul] all service num:" + inputServiceNum + ", filter service num: new——" + builder.size() + + " old——" + latestServiceNum); + // } + + return builder; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListQueue.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListQueue.java index 8c802bd..04e23de 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListQueue.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceListQueue.java @@ -1,22 +1,18 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; -import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.http.HttpEntity; @@ -25,50 +21,48 @@ import org.slf4j.LoggerFactory; public class ServiceListQueue extends BaseQueue<HttpEntity> { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListQueue.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListQueue.class); - - private static final int SERVICE_LIST_DATA_QUEUE_NUM = 1; - private static final int SERVICE_LIST_QUEUE_INDEX = 0; - public ServiceListQueue(final int queueCapacity) { - super(SERVICE_LIST_DATA_QUEUE_NUM,queueCapacity); - } + private static final int SERVICE_LIST_DATA_QUEUE_NUM = 1; + private static final int SERVICE_LIST_QUEUE_INDEX = 0; - @Override - public void put(ServiceData<HttpEntity> data) throws InterruptedException { - BlockingQueue<ServiceData<HttpEntity>> queue=getQueue(SERVICE_LIST_QUEUE_INDEX); - - int size=queue.size(); -// LOGGER.info("before put ServiceListQueue[size:"+size+"] success :[service num]"+data.getData().size()); - //先清空队列 - if(size>0){ - queue.clear(); + public ServiceListQueue(final int queueCapacity) { + super(SERVICE_LIST_DATA_QUEUE_NUM, queueCapacity); } - //插入记录 - queue.put(data); - - } - @Override - public ServiceData<HttpEntity> take(int queueIndex) throws InterruptedException { - BlockingQueue<ServiceData<HttpEntity>> queue = getQueue(queueIndex); - ServiceData<HttpEntity> serviceData = queue.take(); - return serviceData; - - /*//取队列最新一条数据 - if (queue.isEmpty()) { - LOGGER.info("take a single serviceData from ServiceListQueue "); - return serviceData; - } else { - List<ServiceData<Map<String, List<String>>>> serviceDataList = - new ArrayList<ServiceData<Map<String, List<String>>>>(); - //一次性从BlockingQueue获取所有数据 - queue.drainTo(serviceDataList); - LOGGER.info("take multiple serviceDatas from ServiceListQueue :[num]"+serviceDataList.size()); - return serviceDataList.get(serviceDataList.size() - 1); - }*/ - } + @Override + public void put(ServiceData<HttpEntity> data) throws InterruptedException { + BlockingQueue<ServiceData<HttpEntity>> queue = getQueue(SERVICE_LIST_QUEUE_INDEX); + + int size = queue.size(); + // LOGGER.info("before put ServiceListQueue[size:"+size+"] success :[service + // num]"+data.getData().size()); + // 先清空队列 + if (size > 0) { + queue.clear(); + } + // 插入记录 + queue.put(data); + + } + + @Override + public ServiceData<HttpEntity> take(int queueIndex) throws InterruptedException { + BlockingQueue<ServiceData<HttpEntity>> queue = getQueue(queueIndex); + ServiceData<HttpEntity> serviceData = queue.take(); + return serviceData; + + /* + * //取队列最新一条数据 if (queue.isEmpty()) { + * LOGGER.info("take a single serviceData from ServiceListQueue "); return serviceData; } + * else { List<ServiceData<Map<String, List<String>>>> serviceDataList = new + * ArrayList<ServiceData<Map<String, List<String>>>>(); //一次性从BlockingQueue获取所有数据 + * queue.drainTo(serviceDataList); + * LOGGER.info("take multiple serviceDatas from ServiceListQueue :[num]"+serviceDataList. + * size()); return serviceDataList.get(serviceDataList.size() - 1); } + */ + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java index 2282ae9..b1e9144 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.queue; @@ -24,37 +22,41 @@ import org.slf4j.LoggerFactory; public class ServiceQueue extends BaseQueue<List<ServiceHealth>> { - - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceQueue.class); - - private int queneNum; - - public ServiceQueue(final int queneNum,final int queueCapacity) { - super(queneNum,queueCapacity); - this.queneNum=queneNum; - } - - - @Override - public void put(final ServiceData<List<ServiceHealth>> data) throws InterruptedException { - if(data.getData()==null || data.getData().size()==0) return; - - String serviceName = data.getData().get(0).getService().getService(); - long serviceNameHashCode=serviceName.hashCode() & 0x7FFFFFFF; - int queneIndex=(int) (serviceNameHashCode % queneNum); - -// LOGGER.info("put ServiceQueue [serviceName.hashCode():"+serviceNameHashCode+",queneIndex:"+queneIndex+",queneNum:"+queneNum+"] :[serviceName]"+serviceName); - - BlockingQueue<ServiceData<List<ServiceHealth>>> queue=getQueue(queneIndex); - queue.put(data); - - LOGGER.info("put ServiceQueue[index:"+queneIndex+",size:"+queue.size()+"] success :[serviceName]"+serviceName); - } - - @Override - public ServiceData<List<ServiceHealth>> take(final int queueIndex) throws InterruptedException { - return getQueue(queueIndex).take(); - } - + + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceQueue.class); + + private int queneNum; + + public ServiceQueue(final int queneNum, final int queueCapacity) { + super(queneNum, queueCapacity); + this.queneNum = queneNum; + } + + + @Override + public void put(final ServiceData<List<ServiceHealth>> data) throws InterruptedException { + if (data.getData() == null || data.getData().size() == 0) + return; + + String serviceName = data.getData().get(0).getService().getService(); + long serviceNameHashCode = serviceName.hashCode() & 0x7FFFFFFF; + int queneIndex = (int) (serviceNameHashCode % queneNum); + + // LOGGER.info("put ServiceQueue + // [serviceName.hashCode():"+serviceNameHashCode+",queneIndex:"+queneIndex+",queneNum:"+queneNum+"] + // :[serviceName]"+serviceName); + + BlockingQueue<ServiceData<List<ServiceHealth>>> queue = getQueue(queneIndex); + queue.put(data); + + LOGGER.info("put ServiceQueue[index:" + queneIndex + ",size:" + queue.size() + "] success :[serviceName]" + + serviceName); + } + + @Override + public ServiceData<List<ServiceHealth>> take(final int queueIndex) throws InterruptedException { + return getQueue(queueIndex).take(); + } + } |