diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose')
8 files changed, 557 insertions, 632 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java index 5730b4b..49246cf 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -33,81 +31,69 @@ import com.orbitz.consul.model.health.ImmutableNode; -public class CheckServiceDataEmptyAndAutoStopWatchFilter implements - WatchTask.Filter<List<ServiceHealth>> { - - private final static Logger LOGGER = LoggerFactory - .getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class); - private final String serviceName; - - public CheckServiceDataEmptyAndAutoStopWatchFilter( - final String serviceName) { - this.serviceName = serviceName; - } - - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub - boolean result = check(object); - - if (!result) { - // create delete - writeServiceToQueue4Del(); - // stop watch - SyncDataManager.stopWatchService(serviceName); - } - - return result; - } - - // when: - // 1)service had been deleted - // 2)service Health check was not passing - // single service return [],size==0 - // stop this service watching task and create delete event - private boolean check(ConsulResponse<List<ServiceHealth>> object) { - boolean result = true; - - if (object == null || object.getResponse() == null - || object.getResponse().size() == 0) { - LOGGER.info("check service-{},its data is empty", - serviceName); - return false; - } - - return result; - } - - private void writeServiceToQueue4Del() { - ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); - data.setDataType(ServiceData.DataType.service); - data.setOperate(ServiceData.Operate.delete); - - // tell the subsequent operation the service name which will be deleted - Service service = ImmutableService.builder().id("").port(0).address("") - .service(serviceName).addTags("").createIndex(0).modifyIndex(0).build(); - ServiceHealth serviceHealth = ImmutableServiceHealth.builder() - .service(service) - .node(ImmutableNode.builder().node("").address("").build()) - .build(); - List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>(); - serviceHealthList.add(serviceHealth); - - data.setData(serviceHealthList); - - LOGGER.info("put delete service[" - + serviceName - + "] to service queue :because of deleted "); - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.warn( - "put delete service[" - + serviceName - + "] to service queue interrupted because of deleted:", - e); - } - } +public class CheckServiceDataEmptyAndAutoStopWatchFilter implements WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory.getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class); + private final String serviceName; + + public CheckServiceDataEmptyAndAutoStopWatchFilter(final String serviceName) { + this.serviceName = serviceName; + } + + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + boolean result = check(object); + + if (!result) { + // create delete + writeServiceToQueue4Del(); + // stop watch + SyncDataManager.stopWatchService(serviceName); + } + + return result; + } + + // when: + // 1)service had been deleted + // 2)service Health check was not passing + // single service return [],size==0 + // stop this service watching task and create delete event + private boolean check(ConsulResponse<List<ServiceHealth>> object) { + boolean result = true; + + if (object == null || object.getResponse() == null || object.getResponse().size() == 0) { + LOGGER.info("check service-{},its data is empty", serviceName); + return false; + } + + return result; + } + + private void writeServiceToQueue4Del() { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setDataType(ServiceData.DataType.service); + data.setOperate(ServiceData.Operate.delete); + + // tell the subsequent operation the service name which will be deleted + Service service = ImmutableService.builder().id("").port(0).address("").service(serviceName).addTags("") + .createIndex(0).modifyIndex(0).build(); + ServiceHealth serviceHealth = ImmutableServiceHealth.builder().service(service) + .node(ImmutableNode.builder().node("").address("").build()).build(); + List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>(); + serviceHealthList.add(serviceHealth); + + data.setData(serviceHealthList); + + LOGGER.info("put delete service[" + serviceName + "] to service queue :because of deleted "); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn("put delete service[" + serviceName + "] to service queue interrupted because of deleted:", + e); + } + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java index 6dfc86a..49f3aa4 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java @@ -1,24 +1,21 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; import java.util.ArrayList; import java.util.List; -import org.onap.msb.apiroute.SyncDataManager; import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; import org.onap.msb.apiroute.wrapper.queue.QueueManager; import org.onap.msb.apiroute.wrapper.queue.ServiceData; @@ -29,83 +26,74 @@ import org.slf4j.LoggerFactory; import com.orbitz.consul.model.ConsulResponse; -public class CheckTagAndAutoStopWatchFilter implements - WatchTask.Filter<List<ServiceHealth>> { - - private final static Logger LOGGER = LoggerFactory - .getLogger(CheckTagAndAutoStopWatchFilter.class); - - private final String serviceName; - - public CheckTagAndAutoStopWatchFilter(final String serviceName) { - this.serviceName = serviceName; - } - - // from consul,the response data:List<ServiceHealth> - // filter ServiceHealth list and find the ServiceHealths which satisfy the - // tags conditions - // 1)if all ServiceHealth don't satisfy,create delete event and stop watch - // 2)if have some ServiceHealths satisfy the tags conditions,create update - // event and send these ServiceHealths - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub - - // find #ServiceHealth# which satisfy the tag conditions - List<ServiceHealth> satisfyList = getSatisfyList(object); - - // no satisfied ServiceHealth - if (satisfyList.isEmpty()) { - - LOGGER.info("put delete service[" - + serviceName - + "] to service queue :because of NO tag meet the conditions"); - - // create delete - writeServiceToQueue(object.getResponse(), - ServiceData.Operate.delete); - // stop watch - //SyncDataManager.stopWatchService(serviceName); - return false; - } - - LOGGER.info("put update service[" - + serviceName - + "] to service queue :which tags meet the conditions"); - - // put the satisfy list to queue - writeServiceToQueue(satisfyList, ServiceData.Operate.update); - - return true; - } - - private List<ServiceHealth> getSatisfyList( - ConsulResponse<List<ServiceHealth>> object) { - List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>(); - for (ServiceHealth health : object.getResponse()) { - - if (ServiceFilter.getInstance().isFilterCheck(health)) { - satisfyList.add(health); - } - } - - return satisfyList; - } - - private void writeServiceToQueue(List<ServiceHealth> serviceData, - Operate operate) { - ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); - data.setOperate(operate); - data.setDataType(ServiceData.DataType.service); - data.setData(serviceData); - - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - LOGGER.warn("put " + operate + " service[" + serviceName - + "] to service queue interrupted ", e); - } - - } +public class CheckTagAndAutoStopWatchFilter implements WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory.getLogger(CheckTagAndAutoStopWatchFilter.class); + + private final String serviceName; + + public CheckTagAndAutoStopWatchFilter(final String serviceName) { + this.serviceName = serviceName; + } + + // from consul,the response data:List<ServiceHealth> + // filter ServiceHealth list and find the ServiceHealths which satisfy the + // tags conditions + // 1)if all ServiceHealth don't satisfy,create delete event and stop watch + // 2)if have some ServiceHealths satisfy the tags conditions,create update + // event and send these ServiceHealths + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + + // find #ServiceHealth# which satisfy the tag conditions + List<ServiceHealth> satisfyList = getSatisfyList(object); + + // no satisfied ServiceHealth + if (satisfyList.isEmpty()) { + + LOGGER.info("put delete service[" + serviceName + + "] to service queue :because of NO tag meet the conditions"); + + // create delete + writeServiceToQueue(object.getResponse(), ServiceData.Operate.delete); + // stop watch + // SyncDataManager.stopWatchService(serviceName); + return false; + } + + LOGGER.info("put update service[" + serviceName + "] to service queue :which tags meet the conditions"); + + // put the satisfy list to queue + writeServiceToQueue(satisfyList, ServiceData.Operate.update); + + return true; + } + + private List<ServiceHealth> getSatisfyList(ConsulResponse<List<ServiceHealth>> object) { + List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>(); + for (ServiceHealth health : object.getResponse()) { + + if (ServiceFilter.getInstance().isFilterCheck(health)) { + satisfyList.add(health); + } + } + + return satisfyList; + } + + private void writeServiceToQueue(List<ServiceHealth> serviceData, Operate operate) { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setOperate(operate); + data.setDataType(ServiceData.DataType.service); + data.setData(serviceData); + + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + LOGGER.warn("put " + operate + " service[" + serviceName + "] to service queue interrupted ", e); + } + + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java index 08c27a7..ccba7c9 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -25,39 +23,36 @@ import com.orbitz.consul.model.ConsulResponse; public class ConsulIndexFilter<T> implements WatchTask.Filter<T> { - private final static Logger LOGGER = LoggerFactory - .getLogger(ConsulIndexFilter.class); - - private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>( - null); - - @Override - public boolean filter(final ConsulResponse<T> object) { - // TODO Auto-generated method stub - return isChanged(object); - } - - private boolean isChanged(final ConsulResponse<T> consulResponse) { - - if (consulResponse != null && consulResponse.getIndex() != null - && !consulResponse.getIndex().equals(latestIndex.get())) { - - if(LOGGER.isDebugEnabled()){ - //第一次不打印 - if (latestIndex.get()!=null) { - LOGGER.debug("consul index compare:new-" - + consulResponse.getIndex() + " old-" - + latestIndex.get()); - } - - } - - this.latestIndex.set(consulResponse.getIndex()); - return true; - } - - return false; - } - - + private final static Logger LOGGER = LoggerFactory.getLogger(ConsulIndexFilter.class); + + private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null); + + @Override + public boolean filter(final ConsulResponse<T> object) { + // TODO Auto-generated method stub + return isChanged(object); + } + + private boolean isChanged(final ConsulResponse<T> consulResponse) { + + if (consulResponse != null && consulResponse.getIndex() != null + && !consulResponse.getIndex().equals(latestIndex.get())) { + + if (LOGGER.isDebugEnabled()) { + // 第一次不打印 + if (latestIndex.get() != null) { + LOGGER.debug("consul index compare:new-" + consulResponse.getIndex() + " old-" + + latestIndex.get()); + } + + } + + this.latestIndex.set(consulResponse.getIndex()); + return true; + } + + return false; + } + + } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java index 6f90b80..6694b68 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -30,107 +28,104 @@ import com.orbitz.consul.model.health.HealthCheck; public class ServiceModifyIndexFilter implements WatchTask.Filter<List<ServiceHealth>> { - private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse = - new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of()); + private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse = + new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of()); - private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class); + private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class); - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub - List<ServiceHealth> newList=object.getResponse(); - if(realFilter(newList)){ - lastResponse.set(ImmutableList.copyOf(newList)); - return true; - } - - return false; - } - - private boolean realFilter(List<ServiceHealth> newList) { - // 1)判断list的size,不等则改变 - if (newList.size() != lastResponse.get().size()) { - // 第一次不打印 - if (lastResponse.get().size() != 0) { - LOGGER.info(newList.get(0).getService().getService() - + " instance count is different.new_count:" + newList.size() + " old_count:" - + lastResponse.get().size()); - } - - return true; + List<ServiceHealth> newList = object.getResponse(); + if (realFilter(newList)) { + lastResponse.set(ImmutableList.copyOf(newList)); + return true; + } + + return false; } - - - // 2)循环服务实例判断服务内容和健康检查是否改变 - for (ServiceHealth newData : newList) { - ServiceHealth sameIdOldData = findSameIdInOldList(newData); - // 若在oldlist中不存在,则改变 - if (sameIdOldData == null) { - - LOGGER.info(newData.getService().getId() - + " is a new service instance.the createindex:" - + newData.getService().getCreateIndex() - + " the modifyIndex:" - + newData.getService().getModifyIndex()); - - return true; - } - - // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变 - if(!compareService(newData,sameIdOldData)){ - LOGGER.info(newData.getService().getId() +" instance is change because of modifyIndex or health check" ); - return true; - } + + private boolean realFilter(List<ServiceHealth> newList) { + // 1)判断list的size,不等则改变 + if (newList.size() != lastResponse.get().size()) { + // 第一次不打印 + if (lastResponse.get().size() != 0) { + LOGGER.info(newList.get(0).getService().getService() + " instance count is different.new_count:" + + newList.size() + " old_count:" + lastResponse.get().size()); + } + + return true; + } + + + // 2)循环服务实例判断服务内容和健康检查是否改变 + for (ServiceHealth newData : newList) { + ServiceHealth sameIdOldData = findSameIdInOldList(newData); + // 若在oldlist中不存在,则改变 + if (sameIdOldData == null) { + + LOGGER.info(newData.getService().getId() + " is a new service instance.the createindex:" + + newData.getService().getCreateIndex() + " the modifyIndex:" + + newData.getService().getModifyIndex()); + + return true; + } + + // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变 + if (!compareService(newData, sameIdOldData)) { + LOGGER.info(newData.getService().getId() + + " instance is change because of modifyIndex or health check"); + return true; + } + } + + return false; + + } - - return false; - } + private boolean compareService(ServiceHealth oldData, ServiceHealth newData) { + + return compareServiceInfo(oldData.getService(), newData.getService()) + && compareServiceHealthStatus(oldData.getChecks(), newData.getChecks()); + } - private boolean compareService(ServiceHealth oldData,ServiceHealth newData) { - - return compareServiceInfo(oldData.getService(),newData.getService()) && - compareServiceHealthStatus(oldData.getChecks(),newData.getChecks()); - - } - - private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) { - if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) { - LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:" - + newServiceInfo.getModifyIndex() + " old_modifyIndex:" - + oldServiceInfo.getModifyIndex()); - return false; + private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) { + if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) { + LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:" + newServiceInfo.getModifyIndex() + + " old_modifyIndex:" + oldServiceInfo.getModifyIndex()); + return false; + } + return true; } - return true; - } - - private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) { - boolean oldHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(oldData); - boolean newHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(newData); - return oldHealthCheck==newHealthCheck; - - } - - - private ServiceHealth findSameIdInOldList(ServiceHealth newData) { - for (ServiceHealth oldData : lastResponse.get()) { - if (oldData.getService().getId().equals(newData.getService().getId())) { - return oldData; - } + + private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) { + boolean oldHealthCheck = ServiceFilter.getInstance().isFilterHealthCheck(oldData); + boolean newHealthCheck = ServiceFilter.getInstance().isFilterHealthCheck(newData); + return oldHealthCheck == newHealthCheck; + } - return null; - } - public boolean resetModifyIndex() { - // clear last response - lastResponse.set(ImmutableList.<ServiceHealth>of()); - return true; - } + private ServiceHealth findSameIdInOldList(ServiceHealth newData) { + for (ServiceHealth oldData : lastResponse.get()) { + if (oldData.getService().getId().equals(newData.getService().getId())) { + return oldData; + } + } + + return null; + } + + public boolean resetModifyIndex() { + // clear last response + lastResponse.set(ImmutableList.<ServiceHealth>of()); + return true; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java index 678bb87..5cf4017 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java @@ -1,105 +1,88 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; import org.apache.http.HttpEntity; import org.onap.msb.apiroute.wrapper.consulextend.CatalogClient; -import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache; import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; -public class WatchCatalogServicesTask extends WatchTask<HttpEntity> { +public class WatchCatalogServicesTask extends WatchTask<HttpEntity> { + + private final static Logger LOGGER = LoggerFactory.getLogger(WatchCatalogServicesTask.class); + + private ServicesCatalogCache servicesCache = null; + + public WatchCatalogServicesTask(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + initCache(catalogClient, catalogOptions, queryOptions, watchSeconds); + } + + public WatchCatalogServicesTask(final CatalogClient catalogClient, final int watchSeconds) { + initCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, watchSeconds); + } + + public WatchCatalogServicesTask(final CatalogClient catalogClient) { + initCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, 10); + } + + private ServicesCatalogCache initCache(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + LOGGER.info("************create all services watch task*****************"); + servicesCache = ServicesCatalogCache.newCache(catalogClient, catalogOptions, queryOptions, watchSeconds); + + servicesCache.addListener((Listener<HttpEntity>) new InternalListener()); - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchCatalogServicesTask.class); - - private ServicesCatalogCache servicesCache = null; - - public WatchCatalogServicesTask( - final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) - { - initCache(catalogClient,catalogOptions,queryOptions,watchSeconds); - } - - public WatchCatalogServicesTask( - final CatalogClient catalogClient, - final int watchSeconds) - { - initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,watchSeconds); - } - - public WatchCatalogServicesTask( - final CatalogClient catalogClient) - { - initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,10); - } - - private ServicesCatalogCache initCache(final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) { - LOGGER.info("************create all services watch task*****************"); - servicesCache = ServicesCatalogCache.newCache(catalogClient, - catalogOptions, queryOptions, watchSeconds); + return servicesCache; + } - servicesCache - .addListener((Listener<HttpEntity>) new InternalListener()); + @Override + public boolean startWatch() { + // TODO Auto-generated method stub + if (servicesCache != null) { + try { + servicesCache.start(); + LOGGER.info("************start all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service list watch failed:", e); + } + } - return servicesCache; - } - - @Override - public boolean startWatch() { - // TODO Auto-generated method stub - if(servicesCache!=null) - { - try { - servicesCache.start(); - LOGGER.info("************start all services watch task*****************"); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("start service list watch failed:", e); - } - } - - return false; - } + return false; + } - @Override - public boolean stopWatch() { - // TODO Auto-generated method stub - if (servicesCache != null) { - try { - servicesCache.stop(); - LOGGER.info("************stop all services watch task*****************"); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("stop service list watch failed:", e); - } - } - return false; - } + @Override + public boolean stopWatch() { + // TODO Auto-generated method stub + if (servicesCache != null) { + try { + servicesCache.stop(); + LOGGER.info("************stop all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service list watch failed:", e); + } + } + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java index 73a5176..9fad93d 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -19,8 +17,8 @@ import java.math.BigInteger; import java.util.List; import org.onap.msb.apiroute.wrapper.consulextend.HealthClient; -import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache; import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache; import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,115 +27,102 @@ import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; public class WatchServiceHealthTask extends WatchTask<List<ServiceHealth>> { - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchServiceHealthTask.class); - - private ServiceHealthCache serviceHealthCache = null; - private String serviceName=""; - - public String getServiceName() { - return serviceName; - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName,final boolean passing, - final CatalogOptions catalogOptions, final int watchSeconds, - final QueryOptions queryOptions) { - initCache(healthClient, serviceName, passing, catalogOptions, - watchSeconds, queryOptions); - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName,final boolean passing, - final int watchSeconds) - - { - initCache(healthClient, serviceName, passing, CatalogOptions.BLANK, - watchSeconds, QueryOptions.BLANK); - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName, final int watchSeconds) - - { - initCache(healthClient, serviceName, true, CatalogOptions.BLANK, - watchSeconds, QueryOptions.BLANK); - } - - private ServiceHealthCache initCache(final HealthClient healthClient, - final String serviceName,final boolean passing, - final CatalogOptions catalogOptions, final int watchSeconds, - final QueryOptions queryOptions) { -// LOGGER.info("************create {} watch task*****************",serviceName); - this.serviceName = serviceName; - serviceHealthCache = ServiceHealthCache.newCache(healthClient, - serviceName, passing, catalogOptions, watchSeconds, - queryOptions); - - serviceHealthCache - .addListener((Listener<List<ServiceHealth>>) new InternalListener()); - - return serviceHealthCache; - } - - public boolean startWatch() { - - if(serviceHealthCache!=null) - { - try { - serviceHealthCache.start(); - LOGGER.info("************start {} watch task*****************",serviceName); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("start service watch failed:", e); - } - } - - return false; - - } - - public boolean stopWatch(){ - if (serviceHealthCache != null) { - try { - serviceHealthCache.stop(); - LOGGER.info("************stop {} watch task*****************",serviceName); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("stop service watch failed:", e); - } - } - - return false; - } - - - public boolean resetIndex() - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset " + serviceName + " consul index"); - } - - //reset consul index - serviceHealthCache.updateIndex(BigInteger.valueOf(0)); - - - //reset modify index - for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) { - if (filter instanceof ServiceModifyIndexFilter) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset " + serviceName + " modify index"); - } - return ((ServiceModifyIndexFilter) filter).resetModifyIndex(); - } - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset modify index.did not find filter:" + serviceName); - } - - return false; - } + private final static Logger LOGGER = LoggerFactory.getLogger(WatchServiceHealthTask.class); + + private ServiceHealthCache serviceHealthCache = null; + private String serviceName = ""; + + public String getServiceName() { + return serviceName; + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final boolean passing, + final CatalogOptions catalogOptions, final int watchSeconds, final QueryOptions queryOptions) { + initCache(healthClient, serviceName, passing, catalogOptions, watchSeconds, queryOptions); + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final boolean passing, + final int watchSeconds) + + { + initCache(healthClient, serviceName, passing, CatalogOptions.BLANK, watchSeconds, QueryOptions.BLANK); + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final int watchSeconds) + + { + initCache(healthClient, serviceName, true, CatalogOptions.BLANK, watchSeconds, QueryOptions.BLANK); + } + + private ServiceHealthCache initCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { + // LOGGER.info("************create {} watch task*****************",serviceName); + this.serviceName = serviceName; + serviceHealthCache = ServiceHealthCache.newCache(healthClient, serviceName, passing, catalogOptions, + watchSeconds, queryOptions); + + serviceHealthCache.addListener((Listener<List<ServiceHealth>>) new InternalListener()); + + return serviceHealthCache; + } + + public boolean startWatch() { + + if (serviceHealthCache != null) { + try { + serviceHealthCache.start(); + LOGGER.info("************start {} watch task*****************", serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service watch failed:", e); + } + } + + return false; + + } + + public boolean stopWatch() { + if (serviceHealthCache != null) { + try { + serviceHealthCache.stop(); + LOGGER.info("************stop {} watch task*****************", serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service watch failed:", e); + } + } + + return false; + } + + + public boolean resetIndex() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " consul index"); + } + + // reset consul index + serviceHealthCache.updateIndex(BigInteger.valueOf(0)); + + + // reset modify index + for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) { + if (filter instanceof ServiceModifyIndexFilter) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " modify index"); + } + return ((ServiceModifyIndexFilter) filter).resetModifyIndex(); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find filter:" + serviceName); + } + + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java index f565335..f12f95f 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -24,79 +22,77 @@ import org.slf4j.LoggerFactory; import com.orbitz.consul.model.ConsulResponse; public abstract class WatchTask<T> { - private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>(); - private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>(); - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchTask.class); - - //start - public abstract boolean startWatch(); - - //stop - public abstract boolean stopWatch(); - - // filters - public interface Filter<T> { - public boolean filter(final ConsulResponse<T> object); - } - - public boolean addFilter(Filter<T> filter) { - boolean added = filters.add(filter); - return added; - } - - public void removeAllFilter() { - filters.clear(); - } - - - public final CopyOnWriteArrayList<Filter<T>> getAllFilters(){ - return filters; - } - - // handlers - public interface Handler<T> { - void handle(final ConsulResponse<T> object); - } - - public boolean addHandler(Handler<T> handler) { - boolean added = handlers.add(handler); - return added; - } - - public void removeAllHandler() { - handlers.clear(); - } - - // internal listener - protected class InternalListener implements ConsulCache.Listener<T> { - @Override - public void notify(ConsulResponse<T> newValues) { - - long startTime = System.currentTimeMillis(); - - // filter - for (Filter<T> f : filters) { - // false,return - if (!f.filter(newValues)) { - return; - } - } - - // handle - for (Handler<T> h : handlers) { - h.handle(newValues); - } - - long endTime = System.currentTimeMillis(); - - if(endTime-startTime > 10*1000) - { - LOGGER.info("WatchTask THEAD WORK TIMEOUT"); - } - } - - } + private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>(); + private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>(); + private final static Logger LOGGER = LoggerFactory.getLogger(WatchTask.class); + + // start + public abstract boolean startWatch(); + + // stop + public abstract boolean stopWatch(); + + // filters + public interface Filter<T> { + public boolean filter(final ConsulResponse<T> object); + } + + public boolean addFilter(Filter<T> filter) { + boolean added = filters.add(filter); + return added; + } + + public void removeAllFilter() { + filters.clear(); + } + + + public final CopyOnWriteArrayList<Filter<T>> getAllFilters() { + return filters; + } + + // handlers + public interface Handler<T> { + void handle(final ConsulResponse<T> object); + } + + public boolean addHandler(Handler<T> handler) { + boolean added = handlers.add(handler); + return added; + } + + public void removeAllHandler() { + handlers.clear(); + } + + // internal listener + protected class InternalListener implements ConsulCache.Listener<T> { + @Override + public void notify(ConsulResponse<T> newValues) { + + long startTime = System.currentTimeMillis(); + + // filter + for (Filter<T> f : filters) { + // false,return + if (!f.filter(newValues)) { + return; + } + } + + // handle + for (Handler<T> h : handlers) { + h.handle(newValues); + } + + long endTime = System.currentTimeMillis(); + + if (endTime - startTime > 10 * 1000) { + LOGGER.info("WatchTask THEAD WORK TIMEOUT"); + } + } + + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java index c4df452..517003a 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -24,28 +22,27 @@ import com.orbitz.consul.model.ConsulResponse; public class WriteBufferHandler<T> implements WatchTask.Handler<T> { - private static final Logger LOGGER = LoggerFactory - .getLogger(WriteBufferHandler.class); - private final ServiceData.DataType dataType; - - - public WriteBufferHandler(final ServiceData.DataType dataType) { - this.dataType =dataType; - } - - @Override - public void handle(ConsulResponse<T> object) { - // TODO Auto-generated method stub - ServiceData<T> data = new ServiceData<T>(); - data.setDataType(dataType); - data.setData(object.getResponse()); - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.warn("put data to buffer interrupted:", e); - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(WriteBufferHandler.class); + private final ServiceData.DataType dataType; + + + public WriteBufferHandler(final ServiceData.DataType dataType) { + this.dataType = dataType; + } + + @Override + public void handle(ConsulResponse<T> object) { + // TODO Auto-generated method stub + ServiceData<T> data = new ServiceData<T>(); + data.setDataType(dataType); + data.setData(object.getResponse()); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn("put data to buffer interrupted:", e); + } + } } |