From f67adad42f2c857ff76b52df39f0beb5c2cabee4 Mon Sep 17 00:00:00 2001 From: Niranjana Date: Thu, 3 Feb 2022 04:50:38 +0000 Subject: [DCAEGEN2] Calculate slice utilization data Issue-ID: DCAEGEN2-2942 Signed-off-by: Niranjana Change-Id: Id16d2a36cf964b15495531a54094cad96471bdcb --- .../slice/analysis/ms/service/ConsumerThread.java | 150 ++++++++++----------- 1 file changed, 74 insertions(+), 76 deletions(-) (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java') diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java index 6098142e..5dcd9189 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * slice-analysis-ms * ================================================================================ - * Copyright (C) 2020-2021 Wipro Limited. + * Copyright (C) 2020-2022 Wipro Limited. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,95 +24,93 @@ package org.onap.slice.analysis.ms.service; import java.util.List; import java.util.Objects; -import org.onap.slice.analysis.ms.configdb.CpsInterface; import org.onap.slice.analysis.ms.configdb.IConfigDbService; +import org.onap.slice.analysis.ms.cps.CpsInterface; import org.onap.slice.analysis.ms.models.Configuration; import org.onap.slice.analysis.ms.models.SubCounter; import org.onap.slice.analysis.ms.utils.BeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** +/** * This Thread class consumes message from pm data queue and sends onset message to policy */ public class ConsumerThread extends Thread { - private static Logger log = LoggerFactory.getLogger(ConsumerThread.class); - private PmDataQueue pmDataQueue; - private IConfigDbService configDbService; - private SnssaiSamplesProcessor snssaiSamplesProcessor; - private CpsInterface cpsInterface; - private long initialDelaySec; - private int samples; + private static Logger log = LoggerFactory.getLogger(ConsumerThread.class); + private PmDataQueue pmDataQueue; + private IConfigDbService configDbService; + private SnssaiSamplesProcessor snssaiSamplesProcessor; + private CpsInterface cpsInterface; + private long initialDelaySec; + private int samples; - /** - * Default constructor. - */ - public ConsumerThread() { - super(); - this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class); - this.configDbService = BeanUtil.getBean(IConfigDbService.class); - this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds(); - this.samples = Configuration.getInstance().getSamples(); - this.cpsInterface = BeanUtil.getBean(CpsInterface.class); - } + /** + * Default constructor. + */ + public ConsumerThread() { + super(); + this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class); + this.configDbService = BeanUtil.getBean(IConfigDbService.class); + this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds(); + this.samples = Configuration.getInstance().getSamples(); + this.cpsInterface = BeanUtil.getBean(CpsInterface.class); + } - /** - * Consumes data from PM data queue, process the data and sends onset message to policy if needed - */ - @Override - public void run() { - Boolean isConfigDbEnabled = (Objects.isNull(Configuration.getInstance().getConfigDbEnabled())) ? true - : Configuration.getInstance().getConfigDbEnabled(); - boolean done = false; - boolean result = false; - String snssai = ""; - List nfs = null; - while (!done) { - try { - Thread.sleep(initialDelaySec); - log.info("Starting Consumer Thread"); - snssai = pmDataQueue.getSnnsaiFromQueue(); - if (!snssai.equals("")) { - log.info("Consumer thread processing data for s-nssai {}",snssai); - try { - if (isConfigDbEnabled) { - nfs = configDbService.fetchNetworkFunctionsOfSnssai(snssai); - } - else { - nfs = cpsInterface.fetchNetworkFunctionsOfSnssai(snssai); - } - } - catch(Exception e) { - pmDataQueue.putSnssaiToQueue(snssai); - log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage()); - } - if(nfs != null && checkForEnoughSamples(nfs, snssai)) { - this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class); - result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs); - if(!result) { - log.info("Not enough samples to process for {}",snssai); - pmDataQueue.putSnssaiToQueue(snssai); - } - } - } - } catch (Exception e) { - log.error("Exception in Consumer Thread, {}", e.getMessage()); - done = true; - } - } - } + /** + * Consumes data from PM data queue, process the data and sends onset message to policy if needed + */ + @Override + public void run() { + Boolean isConfigDbEnabled = (Objects.isNull(Configuration.getInstance().getConfigDbEnabled())) ? true + : Configuration.getInstance().getConfigDbEnabled(); + boolean done = false; + boolean result = false; + String snssai = ""; + List nfs = null; + while (!done) { + try { + Thread.sleep(initialDelaySec); + log.info("Starting Consumer Thread"); + snssai = pmDataQueue.getSnnsaiFromQueue(); + if (!snssai.equals("")) { + log.info("Consumer thread processing data for s-nssai {}", snssai); + try { + if (isConfigDbEnabled) { + nfs = configDbService.fetchNetworkFunctionsOfSnssai(snssai); + } else { + nfs = cpsInterface.fetchNetworkFunctionsOfSnssai(snssai); + } + } catch (Exception e) { + pmDataQueue.putSnssaiToQueue(snssai); + log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage()); + } + if (nfs != null && checkForEnoughSamples(nfs, snssai)) { + this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class); + result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs); + if (!result) { + log.info("Not enough samples to process for {}", snssai); + pmDataQueue.putSnssaiToQueue(snssai); + } + } + } + } catch (Exception e) { + log.error("Exception in Consumer Thread, {}", e.getMessage()); + done = true; + } + } + } /** * Checks whether enough samples are available for the network functions */ - public boolean checkForEnoughSamples(List nfs, String snssai) { - for(String nf : nfs) { - if(! pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) { - log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai); - pmDataQueue.putSnssaiToQueue(snssai); - return false; - } - } - return true; - } + public boolean checkForEnoughSamples(List nfs, String snssai) { + for (String nf : nfs) { + if (!pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) { + log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai); + pmDataQueue.putSnssaiToQueue(snssai); + return false; + } + } + return true; + } } -- cgit 1.2.3-korg