aboutsummaryrefslogtreecommitdiffstats
path: root/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose
diff options
context:
space:
mode:
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose')
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java98
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java96
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java48
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java121
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java90
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java128
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java87
-rw-r--r--apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java36
8 files changed, 704 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java
new file mode 100644
index 0000000..01b5168
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java
@@ -0,0 +1,98 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.msb.apiroute.SyncDataManager;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ImmutableService;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ImmutableServiceHealth;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.Service;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
+import org.onap.msb.apiroute.wrapper.queue.QueueManager;
+import org.onap.msb.apiroute.wrapper.queue.ServiceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.health.ImmutableNode;
+
+
+
+public class CheckServiceDataEmptyAndAutoStopWatchFilter implements
+ WatchTask.Filter<List<ServiceHealth>> {
+
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class);
+ private final String serviceName;
+
+ public CheckServiceDataEmptyAndAutoStopWatchFilter(
+ final String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean filter(ConsulResponse<List<ServiceHealth>> object) {
+ // TODO Auto-generated method stub
+ boolean result = check(object);
+
+ if (!result) {
+ // create delete
+ writeServiceToQueue4Del();
+ // stop watch
+ SyncDataManager.stopWatchService(serviceName);
+ }
+
+ return result;
+ }
+
+ // when:
+ // 1)service had been deleted
+ // 2)service Health check was not passing
+ // single service return [],size==0
+ // stop this service watching task and create delete event
+ private boolean check(ConsulResponse<List<ServiceHealth>> object) {
+ boolean result = true;
+
+ if (object == null || object.getResponse() == null
+ || object.getResponse().size() == 0) {
+ LOGGER.info("check service-{},its data is empty",
+ serviceName);
+ return false;
+ }
+
+ return result;
+ }
+
+ private void writeServiceToQueue4Del() {
+ ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>();
+ data.setDataType(ServiceData.DataType.service);
+ data.setOperate(ServiceData.Operate.delete);
+
+ // tell the subsequent operation the service name which will be deleted
+ Service service = ImmutableService.builder().id("").port(0).address("")
+ .service(serviceName).addTags("").createIndex(0).modifyIndex(0).build();
+ ServiceHealth serviceHealth = ImmutableServiceHealth.builder()
+ .service(service)
+ .node(ImmutableNode.builder().node("").address("").build())
+ .build();
+ List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>();
+ serviceHealthList.add(serviceHealth);
+
+ data.setData(serviceHealthList);
+
+ LOGGER.info("put delete service["
+ + serviceName
+ + "] to service queue :because of deleted ");
+
+ try {
+ QueueManager.getInstance().putIn(data);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn(
+ "put delete service["
+ + serviceName
+ + "] to service queue interrupted because of deleted:",
+ e);
+ }
+ }
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java
new file mode 100644
index 0000000..311edce
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java
@@ -0,0 +1,96 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.msb.apiroute.SyncDataManager;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
+import org.onap.msb.apiroute.wrapper.queue.QueueManager;
+import org.onap.msb.apiroute.wrapper.queue.ServiceData;
+import org.onap.msb.apiroute.wrapper.queue.ServiceData.Operate;
+import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.model.ConsulResponse;
+
+public class CheckTagAndAutoStopWatchFilter implements
+ WatchTask.Filter<List<ServiceHealth>> {
+
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(CheckTagAndAutoStopWatchFilter.class);
+
+ private final String serviceName;
+
+ public CheckTagAndAutoStopWatchFilter(final String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ // from consul,the response data:List<ServiceHealth>
+ // filter ServiceHealth list and find the ServiceHealths which satisfy the
+ // tags conditions
+ // 1)if all ServiceHealth don't satisfy,create delete event and stop watch
+ // 2)if have some ServiceHealths satisfy the tags conditions,create update
+ // event and send these ServiceHealths
+ @Override
+ public boolean filter(ConsulResponse<List<ServiceHealth>> object) {
+ // TODO Auto-generated method stub
+
+ // find #ServiceHealth# which satisfy the tag conditions
+ List<ServiceHealth> satisfyList = getSatisfyList(object);
+
+ // no satisfied ServiceHealth
+ if (satisfyList.isEmpty()) {
+
+ LOGGER.info("put delete service["
+ + serviceName
+ + "] to service queue :because of NO tag meet the conditions");
+
+ // create delete
+ writeServiceToQueue(object.getResponse(),
+ ServiceData.Operate.delete);
+ // stop watch
+ //SyncDataManager.stopWatchService(serviceName);
+ return false;
+ }
+
+ LOGGER.info("put update service["
+ + serviceName
+ + "] to service queue :which tags meet the conditions");
+
+ // put the satisfy list to queue
+ writeServiceToQueue(satisfyList, ServiceData.Operate.update);
+
+ return true;
+ }
+
+ private List<ServiceHealth> getSatisfyList(
+ ConsulResponse<List<ServiceHealth>> object) {
+ List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>();
+ for (ServiceHealth health : object.getResponse()) {
+
+ if (ServiceFilter.getInstance().isFilterCheck(health)) {
+ satisfyList.add(health);
+ }
+ }
+
+ return satisfyList;
+ }
+
+ private void writeServiceToQueue(List<ServiceHealth> serviceData,
+ Operate operate) {
+ ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>();
+ data.setOperate(operate);
+ data.setDataType(ServiceData.DataType.service);
+ data.setData(serviceData);
+
+
+ try {
+ QueueManager.getInstance().putIn(data);
+ } catch (InterruptedException e) {
+ LOGGER.warn("put " + operate + " service[" + serviceName
+ + "] to service queue interrupted ", e);
+ }
+
+ }
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java
new file mode 100644
index 0000000..14ab2c7
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java
@@ -0,0 +1,48 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.model.ConsulResponse;
+
+public class ConsulIndexFilter<T> implements WatchTask.Filter<T> {
+
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(ConsulIndexFilter.class);
+
+ private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(
+ null);
+
+ @Override
+ public boolean filter(final ConsulResponse<T> object) {
+ // TODO Auto-generated method stub
+ return isChanged(object);
+ }
+
+ private boolean isChanged(final ConsulResponse<T> consulResponse) {
+
+ if (consulResponse != null && consulResponse.getIndex() != null
+ && !consulResponse.getIndex().equals(latestIndex.get())) {
+
+ if(LOGGER.isDebugEnabled()){
+ //第一次不打印
+ if (latestIndex.get()!=null) {
+ LOGGER.debug("consul index compare:new-"
+ + consulResponse.getIndex() + " old-"
+ + latestIndex.get());
+ }
+
+ }
+
+ this.latestIndex.set(consulResponse.getIndex());
+ return true;
+ }
+
+ return false;
+ }
+
+
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java
new file mode 100644
index 0000000..38fb7c6
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java
@@ -0,0 +1,121 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.Service;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
+import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.health.HealthCheck;
+
+public class ServiceModifyIndexFilter implements WatchTask.Filter<List<ServiceHealth>> {
+
+ private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse =
+ new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of());
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class);
+
+
+ @Override
+ public boolean filter(ConsulResponse<List<ServiceHealth>> object) {
+ // TODO Auto-generated method stub
+
+ List<ServiceHealth> newList=object.getResponse();
+ if(realFilter(newList)){
+ lastResponse.set(ImmutableList.copyOf(newList));
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean realFilter(List<ServiceHealth> newList) {
+ // 1)判断list的size,不等则改变
+ if (newList.size() != lastResponse.get().size()) {
+ // 第一次不打印
+ if (lastResponse.get().size() != 0) {
+ LOGGER.info(newList.get(0).getService().getService()
+ + " instance count is different.new_count:" + newList.size() + " old_count:"
+ + lastResponse.get().size());
+ }
+
+ return true;
+ }
+
+
+ // 2)循环服务实例判断服务内容和健康检查是否改变
+ for (ServiceHealth newData : newList) {
+ ServiceHealth sameIdOldData = findSameIdInOldList(newData);
+ // 若在oldlist中不存在,则改变
+ if (sameIdOldData == null) {
+
+ LOGGER.info(newData.getService().getId()
+ + " is a new service instance.the createindex:"
+ + newData.getService().getCreateIndex()
+ + " the modifyIndex:"
+ + newData.getService().getModifyIndex());
+
+ return true;
+ }
+
+ // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变
+ if(!compareService(newData,sameIdOldData)){
+ LOGGER.info(newData.getService().getId() +" instance is change because of modifyIndex or health check" );
+ return true;
+ }
+ }
+
+ return false;
+
+
+ }
+
+
+ private boolean compareService(ServiceHealth oldData,ServiceHealth newData) {
+
+ return compareServiceInfo(oldData.getService(),newData.getService()) &&
+ compareServiceHealthStatus(oldData.getChecks(),newData.getChecks());
+
+ }
+
+
+
+ private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) {
+ if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) {
+ LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:"
+ + newServiceInfo.getModifyIndex() + " old_modifyIndex:"
+ + oldServiceInfo.getModifyIndex());
+ return false;
+ }
+ return true;
+ }
+
+ private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) {
+ boolean oldHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(oldData);
+ boolean newHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(newData);
+ return oldHealthCheck==newHealthCheck;
+
+ }
+
+
+ private ServiceHealth findSameIdInOldList(ServiceHealth newData) {
+ for (ServiceHealth oldData : lastResponse.get()) {
+ if (oldData.getService().getId().equals(newData.getService().getId())) {
+ return oldData;
+ }
+ }
+
+ return null;
+ }
+
+ public boolean resetModifyIndex() {
+ // clear last response
+ lastResponse.set(ImmutableList.<ServiceHealth>of());
+ return true;
+ }
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java
new file mode 100644
index 0000000..c21295f
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java
@@ -0,0 +1,90 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import org.apache.http.HttpEntity;
+import org.onap.msb.apiroute.wrapper.consulextend.CatalogClient;
+import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache;
+import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.option.CatalogOptions;
+import com.orbitz.consul.option.QueryOptions;
+
+public class WatchCatalogServicesTask extends WatchTask<HttpEntity> {
+
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WatchCatalogServicesTask.class);
+
+ private ServicesCatalogCache servicesCache = null;
+
+ public WatchCatalogServicesTask(
+ final CatalogClient catalogClient,
+ final CatalogOptions catalogOptions,
+ final QueryOptions queryOptions,
+ final int watchSeconds)
+ {
+ initCache(catalogClient,catalogOptions,queryOptions,watchSeconds);
+ }
+
+ public WatchCatalogServicesTask(
+ final CatalogClient catalogClient,
+ final int watchSeconds)
+ {
+ initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,watchSeconds);
+ }
+
+ public WatchCatalogServicesTask(
+ final CatalogClient catalogClient)
+ {
+ initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,10);
+ }
+
+ private ServicesCatalogCache initCache(final CatalogClient catalogClient,
+ final CatalogOptions catalogOptions,
+ final QueryOptions queryOptions,
+ final int watchSeconds) {
+ LOGGER.info("************create all services watch task*****************");
+ servicesCache = ServicesCatalogCache.newCache(catalogClient,
+ catalogOptions, queryOptions, watchSeconds);
+
+ servicesCache
+ .addListener((Listener<HttpEntity>) new InternalListener());
+
+ return servicesCache;
+ }
+
+ @Override
+ public boolean startWatch() {
+ // TODO Auto-generated method stub
+ if(servicesCache!=null)
+ {
+ try {
+ servicesCache.start();
+ LOGGER.info("************start all services watch task*****************");
+ return true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn("start service list watch failed:", e);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean stopWatch() {
+ // TODO Auto-generated method stub
+ if (servicesCache != null) {
+ try {
+ servicesCache.stop();
+ LOGGER.info("************stop all services watch task*****************");
+ return true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn("stop service list watch failed:", e);
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java
new file mode 100644
index 0000000..b0d64a7
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java
@@ -0,0 +1,128 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.onap.msb.apiroute.wrapper.consulextend.HealthClient;
+import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache;
+import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener;
+import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.option.CatalogOptions;
+import com.orbitz.consul.option.QueryOptions;
+
+public class WatchServiceHealthTask extends WatchTask<List<ServiceHealth>> {
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WatchServiceHealthTask.class);
+
+ private ServiceHealthCache serviceHealthCache = null;
+ private String serviceName="";
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public WatchServiceHealthTask(final HealthClient healthClient,
+ final String serviceName,final boolean passing,
+ final CatalogOptions catalogOptions, final int watchSeconds,
+ final QueryOptions queryOptions) {
+ initCache(healthClient, serviceName, passing, catalogOptions,
+ watchSeconds, queryOptions);
+ }
+
+ public WatchServiceHealthTask(final HealthClient healthClient,
+ final String serviceName,final boolean passing,
+ final int watchSeconds)
+
+ {
+ initCache(healthClient, serviceName, passing, CatalogOptions.BLANK,
+ watchSeconds, QueryOptions.BLANK);
+ }
+
+ public WatchServiceHealthTask(final HealthClient healthClient,
+ final String serviceName, final int watchSeconds)
+
+ {
+ initCache(healthClient, serviceName, true, CatalogOptions.BLANK,
+ watchSeconds, QueryOptions.BLANK);
+ }
+
+ private ServiceHealthCache initCache(final HealthClient healthClient,
+ final String serviceName,final boolean passing,
+ final CatalogOptions catalogOptions, final int watchSeconds,
+ final QueryOptions queryOptions) {
+// LOGGER.info("************create {} watch task*****************",serviceName);
+ this.serviceName = serviceName;
+ serviceHealthCache = ServiceHealthCache.newCache(healthClient,
+ serviceName, passing, catalogOptions, watchSeconds,
+ queryOptions);
+
+ serviceHealthCache
+ .addListener((Listener<List<ServiceHealth>>) new InternalListener());
+
+ return serviceHealthCache;
+ }
+
+ public boolean startWatch() {
+
+ if(serviceHealthCache!=null)
+ {
+ try {
+ serviceHealthCache.start();
+ LOGGER.info("************start {} watch task*****************",serviceName);
+ return true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn("start service watch failed:", e);
+ }
+ }
+
+ return false;
+
+ }
+
+ public boolean stopWatch(){
+ if (serviceHealthCache != null) {
+ try {
+ serviceHealthCache.stop();
+ LOGGER.info("************stop {} watch task*****************",serviceName);
+ return true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn("stop service watch failed:", e);
+ }
+ }
+
+ return false;
+ }
+
+
+ public boolean resetIndex()
+ {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("reset " + serviceName + " consul index");
+ }
+
+ //reset consul index
+ serviceHealthCache.updateIndex(BigInteger.valueOf(0));
+
+
+ //reset modify index
+ for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) {
+ if (filter instanceof ServiceModifyIndexFilter) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("reset " + serviceName + " modify index");
+ }
+ return ((ServiceModifyIndexFilter) filter).resetModifyIndex();
+ }
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("reset modify index.did not find filter:" + serviceName);
+ }
+
+ return false;
+ }
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java
new file mode 100644
index 0000000..2ada19d
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java
@@ -0,0 +1,87 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.model.ConsulResponse;
+
+public abstract class WatchTask<T> {
+ private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>();
+ private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>();
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WatchTask.class);
+
+ //start
+ public abstract boolean startWatch();
+
+ //stop
+ public abstract boolean stopWatch();
+
+ // filters
+ public interface Filter<T> {
+ public boolean filter(final ConsulResponse<T> object);
+ }
+
+ public boolean addFilter(Filter<T> filter) {
+ boolean added = filters.add(filter);
+ return added;
+ }
+
+ public void removeAllFilter() {
+ filters.clear();
+ }
+
+
+ public final CopyOnWriteArrayList<Filter<T>> getAllFilters(){
+ return filters;
+ }
+
+ // handlers
+ public interface Handler<T> {
+ void handle(final ConsulResponse<T> object);
+ }
+
+ public boolean addHandler(Handler<T> handler) {
+ boolean added = handlers.add(handler);
+ return added;
+ }
+
+ public void removeAllHandler() {
+ handlers.clear();
+ }
+
+ // internal listener
+ protected class InternalListener implements ConsulCache.Listener<T> {
+ @Override
+ public void notify(ConsulResponse<T> newValues) {
+
+ long startTime = System.currentTimeMillis();
+
+ // filter
+ for (Filter<T> f : filters) {
+ // false,return
+ if (!f.filter(newValues)) {
+ return;
+ }
+ }
+
+ // handle
+ for (Handler<T> h : handlers) {
+ h.handle(newValues);
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ if(endTime-startTime > 10*1000)
+ {
+ LOGGER.info("WatchTask THEAD WORK TIMEOUT");
+ }
+ }
+
+ }
+
+
+}
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java
new file mode 100644
index 0000000..6df10e3
--- /dev/null
+++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java
@@ -0,0 +1,36 @@
+package org.onap.msb.apiroute.wrapper.consulextend.expose;
+
+import org.onap.msb.apiroute.wrapper.queue.QueueManager;
+import org.onap.msb.apiroute.wrapper.queue.ServiceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.orbitz.consul.model.ConsulResponse;
+
+public class WriteBufferHandler<T> implements WatchTask.Handler<T> {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(WriteBufferHandler.class);
+ private final ServiceData.DataType dataType;
+
+
+ public WriteBufferHandler(final ServiceData.DataType dataType) {
+ this.dataType =dataType;
+ }
+
+ @Override
+ public void handle(ConsulResponse<T> object) {
+ // TODO Auto-generated method stub
+ ServiceData<T> data = new ServiceData<T>();
+ data.setDataType(dataType);
+ data.setData(object.getResponse());
+
+ try {
+ QueueManager.getInstance().putIn(data);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ LOGGER.warn("put data to buffer interrupted:", e);
+ }
+ }
+
+}