diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose')
8 files changed, 704 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java new file mode 100644 index 0000000..01b5168 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java @@ -0,0 +1,98 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.msb.apiroute.SyncDataManager; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ImmutableService; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ImmutableServiceHealth; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.Service; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.onap.msb.apiroute.wrapper.queue.QueueManager; +import org.onap.msb.apiroute.wrapper.queue.ServiceData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.health.ImmutableNode; + + + +public class CheckServiceDataEmptyAndAutoStopWatchFilter implements + WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory + .getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class); + private final String serviceName; + + public CheckServiceDataEmptyAndAutoStopWatchFilter( + final String serviceName) { + this.serviceName = serviceName; + } + + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + boolean result = check(object); + + if (!result) { + // create delete + writeServiceToQueue4Del(); + // stop watch + SyncDataManager.stopWatchService(serviceName); + } + + return result; + } + + // when: + // 1)service had been deleted + // 2)service Health check was not passing + // single service return [],size==0 + // stop this service watching task and create delete event + private boolean check(ConsulResponse<List<ServiceHealth>> object) { + boolean result = true; + + if (object == null || object.getResponse() == null + || object.getResponse().size() == 0) { + LOGGER.info("check service-{},its data is empty", + serviceName); + return false; + } + + return result; + } + + private void writeServiceToQueue4Del() { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setDataType(ServiceData.DataType.service); + data.setOperate(ServiceData.Operate.delete); + + // tell the subsequent operation the service name which will be deleted + Service service = ImmutableService.builder().id("").port(0).address("") + .service(serviceName).addTags("").createIndex(0).modifyIndex(0).build(); + ServiceHealth serviceHealth = ImmutableServiceHealth.builder() + .service(service) + .node(ImmutableNode.builder().node("").address("").build()) + .build(); + List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>(); + serviceHealthList.add(serviceHealth); + + data.setData(serviceHealthList); + + LOGGER.info("put delete service[" + + serviceName + + "] to service queue :because of deleted "); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn( + "put delete service[" + + serviceName + + "] to service queue interrupted because of deleted:", + e); + } + } +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java new file mode 100644 index 0000000..311edce --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java @@ -0,0 +1,96 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.msb.apiroute.SyncDataManager; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.onap.msb.apiroute.wrapper.queue.QueueManager; +import org.onap.msb.apiroute.wrapper.queue.ServiceData; +import org.onap.msb.apiroute.wrapper.queue.ServiceData.Operate; +import org.onap.msb.apiroute.wrapper.util.ServiceFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.model.ConsulResponse; + +public class CheckTagAndAutoStopWatchFilter implements + WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory + .getLogger(CheckTagAndAutoStopWatchFilter.class); + + private final String serviceName; + + public CheckTagAndAutoStopWatchFilter(final String serviceName) { + this.serviceName = serviceName; + } + + // from consul,the response data:List<ServiceHealth> + // filter ServiceHealth list and find the ServiceHealths which satisfy the + // tags conditions + // 1)if all ServiceHealth don't satisfy,create delete event and stop watch + // 2)if have some ServiceHealths satisfy the tags conditions,create update + // event and send these ServiceHealths + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + + // find #ServiceHealth# which satisfy the tag conditions + List<ServiceHealth> satisfyList = getSatisfyList(object); + + // no satisfied ServiceHealth + if (satisfyList.isEmpty()) { + + LOGGER.info("put delete service[" + + serviceName + + "] to service queue :because of NO tag meet the conditions"); + + // create delete + writeServiceToQueue(object.getResponse(), + ServiceData.Operate.delete); + // stop watch + //SyncDataManager.stopWatchService(serviceName); + return false; + } + + LOGGER.info("put update service[" + + serviceName + + "] to service queue :which tags meet the conditions"); + + // put the satisfy list to queue + writeServiceToQueue(satisfyList, ServiceData.Operate.update); + + return true; + } + + private List<ServiceHealth> getSatisfyList( + ConsulResponse<List<ServiceHealth>> object) { + List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>(); + for (ServiceHealth health : object.getResponse()) { + + if (ServiceFilter.getInstance().isFilterCheck(health)) { + satisfyList.add(health); + } + } + + return satisfyList; + } + + private void writeServiceToQueue(List<ServiceHealth> serviceData, + Operate operate) { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setOperate(operate); + data.setDataType(ServiceData.DataType.service); + data.setData(serviceData); + + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + LOGGER.warn("put " + operate + " service[" + serviceName + + "] to service queue interrupted ", e); + } + + } +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java new file mode 100644 index 0000000..14ab2c7 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java @@ -0,0 +1,48 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.model.ConsulResponse; + +public class ConsulIndexFilter<T> implements WatchTask.Filter<T> { + + private final static Logger LOGGER = LoggerFactory + .getLogger(ConsulIndexFilter.class); + + private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>( + null); + + @Override + public boolean filter(final ConsulResponse<T> object) { + // TODO Auto-generated method stub + return isChanged(object); + } + + private boolean isChanged(final ConsulResponse<T> consulResponse) { + + if (consulResponse != null && consulResponse.getIndex() != null + && !consulResponse.getIndex().equals(latestIndex.get())) { + + if(LOGGER.isDebugEnabled()){ + //第一次不打印 + if (latestIndex.get()!=null) { + LOGGER.debug("consul index compare:new-" + + consulResponse.getIndex() + " old-" + + latestIndex.get()); + } + + } + + this.latestIndex.set(consulResponse.getIndex()); + return true; + } + + return false; + } + + +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java new file mode 100644 index 0000000..38fb7c6 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java @@ -0,0 +1,121 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.onap.msb.apiroute.wrapper.consulextend.model.health.Service; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.onap.msb.apiroute.wrapper.util.ServiceFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.health.HealthCheck; + +public class ServiceModifyIndexFilter implements WatchTask.Filter<List<ServiceHealth>> { + + private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse = + new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of()); + + private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class); + + + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + + List<ServiceHealth> newList=object.getResponse(); + if(realFilter(newList)){ + lastResponse.set(ImmutableList.copyOf(newList)); + return true; + } + + return false; + } + + private boolean realFilter(List<ServiceHealth> newList) { + // 1)判断list的size,不等则改变 + if (newList.size() != lastResponse.get().size()) { + // 第一次不打印 + if (lastResponse.get().size() != 0) { + LOGGER.info(newList.get(0).getService().getService() + + " instance count is different.new_count:" + newList.size() + " old_count:" + + lastResponse.get().size()); + } + + return true; + } + + + // 2)循环服务实例判断服务内容和健康检查是否改变 + for (ServiceHealth newData : newList) { + ServiceHealth sameIdOldData = findSameIdInOldList(newData); + // 若在oldlist中不存在,则改变 + if (sameIdOldData == null) { + + LOGGER.info(newData.getService().getId() + + " is a new service instance.the createindex:" + + newData.getService().getCreateIndex() + + " the modifyIndex:" + + newData.getService().getModifyIndex()); + + return true; + } + + // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变 + if(!compareService(newData,sameIdOldData)){ + LOGGER.info(newData.getService().getId() +" instance is change because of modifyIndex or health check" ); + return true; + } + } + + return false; + + + } + + + private boolean compareService(ServiceHealth oldData,ServiceHealth newData) { + + return compareServiceInfo(oldData.getService(),newData.getService()) && + compareServiceHealthStatus(oldData.getChecks(),newData.getChecks()); + + } + + + + private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) { + if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) { + LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:" + + newServiceInfo.getModifyIndex() + " old_modifyIndex:" + + oldServiceInfo.getModifyIndex()); + return false; + } + return true; + } + + private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) { + boolean oldHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(oldData); + boolean newHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(newData); + return oldHealthCheck==newHealthCheck; + + } + + + private ServiceHealth findSameIdInOldList(ServiceHealth newData) { + for (ServiceHealth oldData : lastResponse.get()) { + if (oldData.getService().getId().equals(newData.getService().getId())) { + return oldData; + } + } + + return null; + } + + public boolean resetModifyIndex() { + // clear last response + lastResponse.set(ImmutableList.<ServiceHealth>of()); + return true; + } +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java new file mode 100644 index 0000000..c21295f --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java @@ -0,0 +1,90 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import org.apache.http.HttpEntity; +import org.onap.msb.apiroute.wrapper.consulextend.CatalogClient; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.option.CatalogOptions; +import com.orbitz.consul.option.QueryOptions; + +public class WatchCatalogServicesTask extends WatchTask<HttpEntity> { + + private final static Logger LOGGER = LoggerFactory + .getLogger(WatchCatalogServicesTask.class); + + private ServicesCatalogCache servicesCache = null; + + public WatchCatalogServicesTask( + final CatalogClient catalogClient, + final CatalogOptions catalogOptions, + final QueryOptions queryOptions, + final int watchSeconds) + { + initCache(catalogClient,catalogOptions,queryOptions,watchSeconds); + } + + public WatchCatalogServicesTask( + final CatalogClient catalogClient, + final int watchSeconds) + { + initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,watchSeconds); + } + + public WatchCatalogServicesTask( + final CatalogClient catalogClient) + { + initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,10); + } + + private ServicesCatalogCache initCache(final CatalogClient catalogClient, + final CatalogOptions catalogOptions, + final QueryOptions queryOptions, + final int watchSeconds) { + LOGGER.info("************create all services watch task*****************"); + servicesCache = ServicesCatalogCache.newCache(catalogClient, + catalogOptions, queryOptions, watchSeconds); + + servicesCache + .addListener((Listener<HttpEntity>) new InternalListener()); + + return servicesCache; + } + + @Override + public boolean startWatch() { + // TODO Auto-generated method stub + if(servicesCache!=null) + { + try { + servicesCache.start(); + LOGGER.info("************start all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service list watch failed:", e); + } + } + + return false; + } + + @Override + public boolean stopWatch() { + // TODO Auto-generated method stub + if (servicesCache != null) { + try { + servicesCache.stop(); + LOGGER.info("************stop all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service list watch failed:", e); + } + } + return false; + } + +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java new file mode 100644 index 0000000..b0d64a7 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java @@ -0,0 +1,128 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.math.BigInteger; +import java.util.List; + +import org.onap.msb.apiroute.wrapper.consulextend.HealthClient; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.option.CatalogOptions; +import com.orbitz.consul.option.QueryOptions; + +public class WatchServiceHealthTask extends WatchTask<List<ServiceHealth>> { + private final static Logger LOGGER = LoggerFactory + .getLogger(WatchServiceHealthTask.class); + + private ServiceHealthCache serviceHealthCache = null; + private String serviceName=""; + + public String getServiceName() { + return serviceName; + } + + public WatchServiceHealthTask(final HealthClient healthClient, + final String serviceName,final boolean passing, + final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { + initCache(healthClient, serviceName, passing, catalogOptions, + watchSeconds, queryOptions); + } + + public WatchServiceHealthTask(final HealthClient healthClient, + final String serviceName,final boolean passing, + final int watchSeconds) + + { + initCache(healthClient, serviceName, passing, CatalogOptions.BLANK, + watchSeconds, QueryOptions.BLANK); + } + + public WatchServiceHealthTask(final HealthClient healthClient, + final String serviceName, final int watchSeconds) + + { + initCache(healthClient, serviceName, true, CatalogOptions.BLANK, + watchSeconds, QueryOptions.BLANK); + } + + private ServiceHealthCache initCache(final HealthClient healthClient, + final String serviceName,final boolean passing, + final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { +// LOGGER.info("************create {} watch task*****************",serviceName); + this.serviceName = serviceName; + serviceHealthCache = ServiceHealthCache.newCache(healthClient, + serviceName, passing, catalogOptions, watchSeconds, + queryOptions); + + serviceHealthCache + .addListener((Listener<List<ServiceHealth>>) new InternalListener()); + + return serviceHealthCache; + } + + public boolean startWatch() { + + if(serviceHealthCache!=null) + { + try { + serviceHealthCache.start(); + LOGGER.info("************start {} watch task*****************",serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service watch failed:", e); + } + } + + return false; + + } + + public boolean stopWatch(){ + if (serviceHealthCache != null) { + try { + serviceHealthCache.stop(); + LOGGER.info("************stop {} watch task*****************",serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service watch failed:", e); + } + } + + return false; + } + + + public boolean resetIndex() + { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " consul index"); + } + + //reset consul index + serviceHealthCache.updateIndex(BigInteger.valueOf(0)); + + + //reset modify index + for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) { + if (filter instanceof ServiceModifyIndexFilter) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " modify index"); + } + return ((ServiceModifyIndexFilter) filter).resetModifyIndex(); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find filter:" + serviceName); + } + + return false; + } +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java new file mode 100644 index 0000000..2ada19d --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java @@ -0,0 +1,87 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import java.util.concurrent.CopyOnWriteArrayList; + +import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.model.ConsulResponse; + +public abstract class WatchTask<T> { + private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>(); + private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>(); + private final static Logger LOGGER = LoggerFactory + .getLogger(WatchTask.class); + + //start + public abstract boolean startWatch(); + + //stop + public abstract boolean stopWatch(); + + // filters + public interface Filter<T> { + public boolean filter(final ConsulResponse<T> object); + } + + public boolean addFilter(Filter<T> filter) { + boolean added = filters.add(filter); + return added; + } + + public void removeAllFilter() { + filters.clear(); + } + + + public final CopyOnWriteArrayList<Filter<T>> getAllFilters(){ + return filters; + } + + // handlers + public interface Handler<T> { + void handle(final ConsulResponse<T> object); + } + + public boolean addHandler(Handler<T> handler) { + boolean added = handlers.add(handler); + return added; + } + + public void removeAllHandler() { + handlers.clear(); + } + + // internal listener + protected class InternalListener implements ConsulCache.Listener<T> { + @Override + public void notify(ConsulResponse<T> newValues) { + + long startTime = System.currentTimeMillis(); + + // filter + for (Filter<T> f : filters) { + // false,return + if (!f.filter(newValues)) { + return; + } + } + + // handle + for (Handler<T> h : handlers) { + h.handle(newValues); + } + + long endTime = System.currentTimeMillis(); + + if(endTime-startTime > 10*1000) + { + LOGGER.info("WatchTask THEAD WORK TIMEOUT"); + } + } + + } + + +} diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java new file mode 100644 index 0000000..6df10e3 --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java @@ -0,0 +1,36 @@ +package org.onap.msb.apiroute.wrapper.consulextend.expose; + +import org.onap.msb.apiroute.wrapper.queue.QueueManager; +import org.onap.msb.apiroute.wrapper.queue.ServiceData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orbitz.consul.model.ConsulResponse; + +public class WriteBufferHandler<T> implements WatchTask.Handler<T> { + + private static final Logger LOGGER = LoggerFactory + .getLogger(WriteBufferHandler.class); + private final ServiceData.DataType dataType; + + + public WriteBufferHandler(final ServiceData.DataType dataType) { + this.dataType =dataType; + } + + @Override + public void handle(ConsulResponse<T> object) { + // TODO Auto-generated method stub + ServiceData<T> data = new ServiceData<T>(); + data.setDataType(dataType); + data.setData(object.getResponse()); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn("put data to buffer interrupted:", e); + } + } + +} |