summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java150
1 files changed, 74 insertions, 76 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
index 6098142e..5dcd9189 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* slice-analysis-ms
* ================================================================================
- * Copyright (C) 2020-2021 Wipro Limited.
+ * Copyright (C) 2020-2022 Wipro Limited.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,95 +24,93 @@ package org.onap.slice.analysis.ms.service;
import java.util.List;
import java.util.Objects;
-import org.onap.slice.analysis.ms.configdb.CpsInterface;
import org.onap.slice.analysis.ms.configdb.IConfigDbService;
+import org.onap.slice.analysis.ms.cps.CpsInterface;
import org.onap.slice.analysis.ms.models.Configuration;
import org.onap.slice.analysis.ms.models.SubCounter;
import org.onap.slice.analysis.ms.utils.BeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
+/**
* This Thread class consumes message from pm data queue and sends onset message to policy
*/
public class ConsumerThread extends Thread {
- private static Logger log = LoggerFactory.getLogger(ConsumerThread.class);
- private PmDataQueue pmDataQueue;
- private IConfigDbService configDbService;
- private SnssaiSamplesProcessor snssaiSamplesProcessor;
- private CpsInterface cpsInterface;
- private long initialDelaySec;
- private int samples;
+ private static Logger log = LoggerFactory.getLogger(ConsumerThread.class);
+ private PmDataQueue pmDataQueue;
+ private IConfigDbService configDbService;
+ private SnssaiSamplesProcessor snssaiSamplesProcessor;
+ private CpsInterface cpsInterface;
+ private long initialDelaySec;
+ private int samples;
- /**
- * Default constructor.
- */
- public ConsumerThread() {
- super();
- this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
- this.configDbService = BeanUtil.getBean(IConfigDbService.class);
- this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
- this.samples = Configuration.getInstance().getSamples();
- this.cpsInterface = BeanUtil.getBean(CpsInterface.class);
- }
+ /**
+ * Default constructor.
+ */
+ public ConsumerThread() {
+ super();
+ this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
+ this.configDbService = BeanUtil.getBean(IConfigDbService.class);
+ this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
+ this.samples = Configuration.getInstance().getSamples();
+ this.cpsInterface = BeanUtil.getBean(CpsInterface.class);
+ }
- /**
- * Consumes data from PM data queue, process the data and sends onset message to policy if needed
- */
- @Override
- public void run() {
- Boolean isConfigDbEnabled = (Objects.isNull(Configuration.getInstance().getConfigDbEnabled())) ? true
- : Configuration.getInstance().getConfigDbEnabled();
- boolean done = false;
- boolean result = false;
- String snssai = "";
- List<String> nfs = null;
- while (!done) {
- try {
- Thread.sleep(initialDelaySec);
- log.info("Starting Consumer Thread");
- snssai = pmDataQueue.getSnnsaiFromQueue();
- if (!snssai.equals("")) {
- log.info("Consumer thread processing data for s-nssai {}",snssai);
- try {
- if (isConfigDbEnabled) {
- nfs = configDbService.fetchNetworkFunctionsOfSnssai(snssai);
- }
- else {
- nfs = cpsInterface.fetchNetworkFunctionsOfSnssai(snssai);
- }
- }
- catch(Exception e) {
- pmDataQueue.putSnssaiToQueue(snssai);
- log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage());
- }
- if(nfs != null && checkForEnoughSamples(nfs, snssai)) {
- this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
- result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs);
- if(!result) {
- log.info("Not enough samples to process for {}",snssai);
- pmDataQueue.putSnssaiToQueue(snssai);
- }
- }
- }
- } catch (Exception e) {
- log.error("Exception in Consumer Thread, {}", e.getMessage());
- done = true;
- }
- }
- }
+ /**
+ * Consumes data from PM data queue, process the data and sends onset message to policy if needed
+ */
+ @Override
+ public void run() {
+ Boolean isConfigDbEnabled = (Objects.isNull(Configuration.getInstance().getConfigDbEnabled())) ? true
+ : Configuration.getInstance().getConfigDbEnabled();
+ boolean done = false;
+ boolean result = false;
+ String snssai = "";
+ List<String> nfs = null;
+ while (!done) {
+ try {
+ Thread.sleep(initialDelaySec);
+ log.info("Starting Consumer Thread");
+ snssai = pmDataQueue.getSnnsaiFromQueue();
+ if (!snssai.equals("")) {
+ log.info("Consumer thread processing data for s-nssai {}", snssai);
+ try {
+ if (isConfigDbEnabled) {
+ nfs = configDbService.fetchNetworkFunctionsOfSnssai(snssai);
+ } else {
+ nfs = cpsInterface.fetchNetworkFunctionsOfSnssai(snssai);
+ }
+ } catch (Exception e) {
+ pmDataQueue.putSnssaiToQueue(snssai);
+ log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage());
+ }
+ if (nfs != null && checkForEnoughSamples(nfs, snssai)) {
+ this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
+ result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs);
+ if (!result) {
+ log.info("Not enough samples to process for {}", snssai);
+ pmDataQueue.putSnssaiToQueue(snssai);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception in Consumer Thread, {}", e.getMessage());
+ done = true;
+ }
+ }
+ }
/**
* Checks whether enough samples are available for the network functions
*/
- public boolean checkForEnoughSamples(List<String> nfs, String snssai) {
- for(String nf : nfs) {
- if(! pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) {
- log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai);
- pmDataQueue.putSnssaiToQueue(snssai);
- return false;
- }
- }
- return true;
- }
+ public boolean checkForEnoughSamples(List<String> nfs, String snssai) {
+ for (String nf : nfs) {
+ if (!pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) {
+ log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai);
+ pmDataQueue.putSnssaiToQueue(snssai);
+ return false;
+ }
+ }
+ return true;
+ }
}