diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java new file mode 100644 index 0000000..6c179fc --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/queue/ServiceQueue.java @@ -0,0 +1,45 @@ +package org.onap.msb.apiroute.wrapper.queue; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ServiceQueue extends BaseQueue<List<ServiceHealth>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceQueue.class); + + private int queneNum; + + public ServiceQueue(final int queneNum,final int queueCapacity) { + super(queneNum,queueCapacity); + this.queneNum=queneNum; + } + + + @Override + public void put(final ServiceData<List<ServiceHealth>> data) throws InterruptedException { + if(data.getData()==null || data.getData().size()==0) return; + + String serviceName = data.getData().get(0).getService().getService(); + long serviceNameHashCode=serviceName.hashCode() & 0x7FFFFFFF; + int queneIndex=(int) (serviceNameHashCode % queneNum); + +// LOGGER.info("put ServiceQueue [serviceName.hashCode():"+serviceNameHashCode+",queneIndex:"+queneIndex+",queneNum:"+queneNum+"] :[serviceName]"+serviceName); + + BlockingQueue<ServiceData<List<ServiceHealth>>> queue=getQueue(queneIndex); + queue.put(data); + + LOGGER.info("put ServiceQueue[index:"+queneIndex+",size:"+queue.size()+"] success :[serviceName]"+serviceName); + } + + @Override + public ServiceData<List<ServiceHealth>> take(final int queueIndex) throws InterruptedException { + return getQueue(queueIndex).take(); + } + + +} |