summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java24
1 files changed, 21 insertions, 3 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
index 6868bc46..b676273f 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.onap.slice.analysis.ms.configdb.IConfigDbService;
import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.models.SubCounter;
import org.onap.slice.analysis.ms.utils.BeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ public class ConsumerThread extends Thread {
private IConfigDbService configDbService;
private SnssaiSamplesProcessor snssaiSamplesProcessor;
private long initialDelaySec;
+ private int samples;
/**
* Default constructor.
@@ -46,8 +48,8 @@ public class ConsumerThread extends Thread {
super();
this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
this.configDbService = BeanUtil.getBean(IConfigDbService.class);
- this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
+ this.samples = Configuration.getInstance().getSamples();
}
/**
@@ -56,12 +58,13 @@ public class ConsumerThread extends Thread {
@Override
public void run() {
boolean done = false;
- String snssai = "";
boolean result = false;
+ String snssai = "";
List<String> nfs = null;
while (!done) {
try {
Thread.sleep(initialDelaySec);
+ log.info("Starting Consumer Thread");
snssai = pmDataQueue.getSnnsaiFromQueue();
if (!snssai.equals("")) {
log.info("Consumer thread processing data for s-nssai {}",snssai);
@@ -72,7 +75,8 @@ public class ConsumerThread extends Thread {
pmDataQueue.putSnssaiToQueue(snssai);
log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage());
}
- if(nfs != null) {
+ if(nfs != null && checkForEnoughSamples(nfs, snssai)) {
+ this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs);
if(!result) {
log.info("Not enough samples to process for {}",snssai);
@@ -86,4 +90,18 @@ public class ConsumerThread extends Thread {
}
}
}
+
+ /**
+ * Checks whether enough samples are available for the network functions
+ */
+ public boolean checkForEnoughSamples(List<String> nfs, String snssai) {
+ for(String nf : nfs) {
+ if(! pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) {
+ log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai);
+ pmDataQueue.putSnssaiToQueue(snssai);
+ return false;
+ }
+ }
+ return true;
+ }
}