summaryrefslogtreecommitdiffstats
path: root/components/ml-prediction-ms/src/run.py
diff options
context:
space:
mode:
authorsendil kumar Jaya kumar <sendil.kumar@wipro.com>2022-03-03 15:15:05 +0530
committersendil kumar Jaya kumar <sendil.kumar@wipro.com>2023-03-31 12:32:53 +0530
commit93d9a28ba317475083a367d040ab606d6576fb03 (patch)
treec231446eb527ceb9397bce857c8b3dad10b617ce /components/ml-prediction-ms/src/run.py
parent5f69c24ad78121a2840b5299583791e557f8b535 (diff)
Add ml-prediction-ms with basic build infrastruct.
Added offline training code, along with ml slice prediction model, The main ml-prediction-ms component code changes are added to perform prediction on single slice. Issue-ID: DCAEGEN2-3067 Signed-off-by: sendil kumar Jaya kumar <sendil.kumar@wipro.com> Change-Id: Ia2e50dcbd97d03a96884a2bf5417edbf2fa04b4a
Diffstat (limited to 'components/ml-prediction-ms/src/run.py')
-rwxr-xr-xcomponents/ml-prediction-ms/src/run.py680
1 files changed, 680 insertions, 0 deletions
diff --git a/components/ml-prediction-ms/src/run.py b/components/ml-prediction-ms/src/run.py
new file mode 100755
index 00000000..752355d8
--- /dev/null
+++ b/components/ml-prediction-ms/src/run.py
@@ -0,0 +1,680 @@
+#!/usr/bin/env python3
+# ============LICENSE_START=======================================================
+# ml-prediction-ms
+# ================================================================================
+# Copyright (C) 2023 Wipro Limited
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import os
+os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # or any {'0', '1', '2'}
+
+import pandas as pd
+from pandas import DataFrame, read_excel,concat
+from tensorflow.keras.models import load_model
+from sklearn.preprocessing import MinMaxScaler
+from numpy import concatenate
+from datetime import datetime
+import json
+import time
+import requests
+from requests.auth import HTTPBasicAuth
+import logging
+import configparser
+import uuid
+
+from confluent_kafka import Consumer
+from confluent_kafka import Producer
+import socket
+
+global window_size
+window_size=4
+
+# Create and configure logger
+import io, os, sys
+try:
+ # Python 3, open as binary, then wrap in a TextIOWrapper with write-through.
+ sys.stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
+ # If flushing on newlines is sufficient, as of 3.7 you can instead just call:
+ sys.stdout.reconfigure(line_buffering=True)
+except TypeError:
+ # older version of Python
+ pass
+
+
+
+logging.basicConfig(filename="IntelligentSliceMl.log",
+ format='%(asctime)s - %(levelname)s - %(levelno)s - %(process)d - %(name)s - %(message)s',
+ filemode='w')
+
+# Creating an object
+logger = logging.getLogger('ml_ms_prediction')
+logger.setLevel(logging.DEBUG)
+
+logger.info("traceID-%s : Start Prediction", str(uuid.uuid4())[:8])
+
+class Config:
+
+ def __init__(self):
+ config = configparser.ConfigParser()
+ config.readfp(open(r'ml-prediction-ms.config'))
+
+ self.pathToStartPMData = config.get('PM DATA TOPICS', 'PathToStartPMData')
+ self.pathToStopPMData = config.get('PM DATA TOPICS', 'PathToStopPMData')
+ self.getDataTopic = config.get('PM DATA TOPICS', 'GetDataTopic')
+
+ self.pathToGetConfigData = config.get('CONFIG DATA TOPICS', 'PathToGetConfigData')
+ self.getPolicyTopic = config.get('POLICY UPDATE', 'GetPolicyTopic')
+
+ self.serverName = config.get('SERVER NAME', 'ServerName')
+
+ def get_pathToStartPMData(self):
+ return self.pathToStartPMData
+
+ def get_pathToStopPMData(self):
+ return self.pathToStopPMData
+
+ def get_DataTopic(self):
+ return self.getDataTopic
+
+ def get_pathToGetConfigData(self):
+ return self.pathToGetConfigData
+
+ def get_PolicyTopic(self):
+ return self.getPolicyTopic
+
+ def get_serverName(self):
+ return self.serverName
+
+class Parser:
+
+ def __init__(self):
+ self.Config_Object=Config()
+
+ def Data_Parser(self, data_val,data_dic,features, slice_name):
+ """
+ Perform Data Parser
+ READ THE ACTUAL PARAMETERS FROM THE topic MESSAGE AND ADDS IT INTO A DICTIONARY
+ Args:
+ cells_data_list: Cell data list object
+ data_dic: The Parsed data on cell data contained in dictionary
+ features: Data featurs (PM Metrics)
+ slice_name : Slice name
+ Returns:
+ data_dic: none
+ Raises:
+ RuntimeError: Error while Process Slices Cells Data.
+ """
+ try:
+ len_data_val=len(data_val)
+ for i in range(len_data_val):
+ cell_id = data_val[i]['measObjInstId']
+ response = requests.get(self.Config_Object.get_pathToGetConfigData() + cell_id + '/snssai/'+ slice_name, timeout=3)
+ config_data=response.json()['maxNumberOfConns']
+
+ results= data_val[i]['measResults']
+ len_results=len(results)
+ for j in range(len_results):
+ p=int(results[j]['p'])
+ value=int(results[j]['sValue'])
+ key = slice_name+'_'+cell_id +'_'+features[p-1].split('-')[0]
+ if key not in data_dic:
+ data_dic[key]=[value]
+ else:
+ data_dic[key].append(value)
+ #We are normalising the prediction, we are have the actual prediction starting from the 5th time instance
+ #so for the first 4 time instances we are generating synthetic data generation for prediction result
+ #this is done as a softmax correction, essential for better accuracy
+ #After the first 4 time instances the predicted values are used and and taken forward.
+ for j in range(3,5):
+ key = slice_name+'_'+cell_id +features[j]
+ if key not in data_dic:
+ data_dic[key]=[config_data]
+ elif j==3:
+ data_dic[key].append(config_data)
+ elif j==4:
+ change = (
+ data_dic[slice_name + "_" + cell_id + "_SM.PDUSessionSetupFail.0"][-1]
+ / data_dic[slice_name + "_" + cell_id + "_SM.PDUSessionSetupReq.01"][-1]
+ - data_dic[slice_name + "_" + cell_id + "_SM.PDUSessionSetupFail.0"][-2]
+ / data_dic[slice_name + "_" + cell_id + "_SM.PDUSessionSetupReq.01"][-2]
+ )
+ data_dic[key].append(change*config_data+config_data)
+ except Exception as e:
+ logger.error("traceID-%s Error in Parser Slices Cells Data:\n%s", str(uuid.uuid4())[:8], e)
+ except requests.Timeout as error:
+ logger.critical("traceId-%s Timeout from Get CONFIG DATA topic :\n%s",str(uuid.uuid4())[:8], error)
+ return data_dic
+
+class Prediction:
+
+ # Time Series Prediction using the LSTM Model and appplies the Logic to give the final predicted output
+ modelfile = 'model/best_model.h5'
+ model= load_model(modelfile, compile=False)
+
+ def __init__(self):
+ self.Config_Object=Config()
+
+ def IsPolicyUpdate_url_Exist(self):
+ """
+ Get the status of the Policy response topic by checking its execution status for unit test module.
+
+ Args:
+ none: none
+ Returns:
+ status:Bool status of Topic
+
+ """
+ status = True
+ try:
+ Post_Message ={}
+ json_object = json.dumps(Post_Message, indent = 4)
+ conf = {'bootstrap.servers': "kafka:9092",'client.id': socket.gethostname()}
+
+ producer = Producer(conf)
+ producer.produce(self.Config_Object.get_PolicyTopic(), value=json_object.encode('utf-8'))
+ producer.poll(1)
+ except Exception as e:
+ status = False
+ except requests.Timeout as error:
+ status = False
+ return status
+
+ def series_to_supervised(self, data, n_in=1, n_out=1, dropnan=True):
+ """
+ Convert the timeseries into Forecast series
+ Args:
+ data: Input time series data to be processed
+ n_in: Input window size for time series to carry previous nth time instance value.
+ n_out: output future n time instance value to be forecasted against the timeseries
+ dropnan : Flag to drop nan values
+
+ Returns:
+ agg (list): Aggregated list of past time series inputs as per the input window and the time series of the predicted future time instance.
+
+ Raises:
+ RuntimeError: Error Pre Processing Slices Cells Data.
+ """
+ try:
+ n_vars = 1 if type(data) is list else data.shape[1]
+ df = DataFrame(data)
+ cols, names = [],[]
+ # input sequence (t-n, ... t-1)
+ for i in range(n_in, 0, -1):
+ cols.append(df.shift(i))
+ names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
+ # forecast sequence (t, t+1, ... t+n)
+ for i in range(0, n_out):
+ cols.append(df.shift(-i))
+ if i == 0:
+ names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
+ else:
+ names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
+ # put it all together
+ agg = concat(cols, axis=1)
+ agg.columns = names
+
+ # drop rows with NaN values
+ if dropnan:
+ agg.dropna(inplace=True)
+
+ except Exception as e:
+ logger.error("traceID-%s Error Pre Processing Slices Cells Data f:\n%s",str(uuid.uuid4())[:8], e)
+ return agg
+
+ def Predict_Model(self, test):
+ """
+ Does the Actual Prediction on the Input data
+
+ Args:
+ test: Input data to model with current and last 4 time instances (window_size)
+ Returns:
+ inv_yhat= A 2-D list with predicted results.
+ Raises:
+ RuntimeError: Error in Prediction.
+ """
+
+ inv_yhat =[]
+
+ try:
+ scaler = MinMaxScaler(feature_range=(-1, 1))
+ scaled = scaler.fit_transform(test)
+ reframed = self.series_to_supervised(scaled,window_size, 1)
+ test=reframed.values
+ test_X= test[:, :window_size*scaled.shape[1] + scaled.shape[1]-1]
+ test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
+ yhat = self.model.predict(test_X)
+ test_X = test_X.reshape((test_X.shape[0], test_X.shape[2]))
+ inv_yhat = concatenate((test_X[:,-4:],yhat), axis=1)
+ inv_yhat = scaler.inverse_transform(inv_yhat)
+ except Exception as e:
+ logger.critical("traceId-%s Error in Prediction:\n%s",str(uuid.uuid4())[:8], e)
+ return inv_yhat
+
+ def Logic(self, Prev_Thresh,Current_Thresh):
+ """
+ Post prediction, Applies the post processing Logic i.e
+ (+-)10% cap on Slice config and (+-)25% on cell config against the predicted values
+
+ Args:
+ Prev_Thresh: List of Previous Configuration('maxumberOfConns') as read from config DB
+ Current_Thresh: List of Predicted Configuration('maxumberOfConns')
+ Returns:
+ Final_Pred_Val= List of Final Configuration('maxumberOfConns')
+ Raises:
+ RuntimeError: Error in Prediction.
+ """
+ try:
+ Sum_Prev_Thresh=sum(Prev_Thresh)
+ Sum_Pred_Thresh= sum(Current_Thresh)
+ Global_change= abs(sum(Current_Thresh)-sum(Prev_Thresh))/sum(Prev_Thresh)
+ #logger.info('Global_change',Global_change)
+ Final_Pred_Val=[]
+ Percent_Change=[]
+ sum_Pred_thresh_change=0
+ # Rule 1 is applied to compute cell based Min /Max (-25%, 25%)
+ len_Prev_Thresh=len(Prev_Thresh)
+ for cell_instance, prev_t in enumerate(Prev_Thresh):
+ if (Current_Thresh[cell_instance]-prev_t)/prev_t > 0.25:
+ Rule_based_Percent = 0.25 # rule bases total percentage
+ elif (Current_Thresh[cell_instance]-prev_t)/prev_t <-0.25:
+ Rule_based_Percent = -0.25
+ else:
+ Rule_based_Percent=(Current_Thresh[cell_instance]-prev_t)/prev_t
+
+ Percent_Change.append(Rule_based_Percent)
+ # predicted sum of threshold change for all cells
+ sum_Pred_thresh_change=sum_Pred_thresh_change+Rule_based_Percent
+
+
+ if Global_change <= 0.10:
+ for cell_instance, prev_t in enumerate(Prev_Thresh):
+ Final_Pred_Val.append(prev_t+prev_t*Percent_Change[cell_instance])
+ else:
+ #Rule 2 - to distribut global threshold to all cells based on only 10% increase in slice
+ Thresh_Rule_2 = []
+ extra = 0.1*Sum_Prev_Thresh
+
+ for i in range(len_Prev_Thresh):
+ new_val = Prev_Thresh[i]+extra*Percent_Change[i]/abs(sum_Pred_thresh_change)
+ if abs(extra*Percent_Change[i]/abs(sum_Pred_thresh_change))> abs(Percent_Change[i]*Prev_Thresh[i]):
+ new_val = Prev_Thresh[i]+Prev_Thresh[i]*Percent_Change[i]
+ Final_Pred_Val.append(new_val)
+ except Exception as e:
+ logger.error("traceId-%s Error in Post_Prediction_Logic:\n%s", str(uuid.uuid4())[:8], e)
+ return Final_Pred_Val
+
+ def acked(err, msg):
+ """
+ Function to format the error in case of exception being None
+
+ Args:
+ err: Exception object
+ msg: Error message
+ Returns:
+ None: None
+
+ """
+ if err is not None:
+ logger.error("traceId-%s Failed to deliver message: %s",str(uuid.uuid4())[:8], str(err))
+ else:
+ logger.info('traceId-%s %s',str(uuid.uuid4())[:8], (str(msg)))
+
+
+ def Final_Post_Method(self, Predicted_Results, Previous_Results, slices, server_name):
+ """
+ Posts the final predicted output (Final Output of the ML Service)
+ Args:
+ Predicted_Results: Contains Predicted results w.r.t the cell ID
+ Previous_Results: Contains Previous Configured values w.r.t the cell ID
+ slices: Slice name
+ Server_name: contains server name
+ Returns:
+ status: True on post with content success, False on post failure.
+ Raises:
+ RuntimeError: Error Posting the Final Output
+ """
+ status = True
+ try:
+ Post_Message ={}
+ Post_Message["snssai"]= slices
+ Post_Message['data'] = []
+ #for server in server_info.keys():
+ temp_dict_1 = {}
+ temp_dict_1['gNBCUName'] = server_name
+ temp_dict_1['cellCUList'] = []
+ for key, Predicted_Result in Predicted_Results.items():
+ temp_dict_2={}
+ temp_dict_2['cellLocalId'] = key
+ temp_dict_2['configData'] = {}
+ temp_dict_2['configData']['maxNumberofConns'] = int(Previous_Results[key])
+ temp_dict_2['configData']['predictedMaxNumberofConns'] = int(Predicted_Result)
+ now = datetime.now()
+ dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
+ temp_dict_2['configData']['lastUpdatedTS'] = dt_string
+ temp_dict_1['cellCUList'].append(temp_dict_2)
+ Post_Message['data'].append(temp_dict_1)
+
+ json_object = json.dumps(Post_Message, indent = 4)
+ response = []
+
+ try:
+ conf = {'bootstrap.servers': "kafka:9092",'client.id': socket.gethostname()}
+ producer = Producer(conf)
+ producer.produce(self.Config_Object.get_PolicyTopic(), value=json_object.encode('utf-8'))
+
+ producer.poll(1)
+
+ except requests.Timeout as error:
+ status = False
+ logger.error("traceId-%s Posting the Final Output To Dmap Topic:\n%s",str(uuid.uuid4())[:8], error)
+ #print(response)
+ logger.info('traceId-%s %s',str(uuid.uuid4())[:8], Post_Message)
+ except Exception as e:
+ status = False
+ logger.error("traceId-%s Posting the Final Output:\n%s",str(uuid.uuid4())[:8], e)
+
+ return status
+
+class Controller:
+
+ def __init__(self):
+ self.data_dic={}
+ self.Parser_Object=Parser()
+ self.Predict_Object=Prediction()
+ self.Config_Object=Config()
+
+
+ def GetData(self, consumer):
+ """
+ Get the data generation from ransim topic on PM Data for Intelligenct slicing
+
+ Args:
+ consumer: Consumer Topic instance to get the PM Data
+ Returns:
+ pm_data: Slices and cells PM data
+
+ Raises:
+ RuntimeError: Error while Get Data from topic.
+ """
+ pm_data = []
+ try:
+ msg = consumer.poll(timeout=-1)
+
+ if msg is None:
+ # Initial message consumption may take up to
+ # `session.timeout.ms` for the consumer group to
+ # rebalance and start consuming
+ #print("Waiting...")
+ logger.info("traceId-%s PM Get Data from topic Waiting...: ",str(uuid.uuid4())[:8])
+ elif msg.error():
+ #print("ERROR: %s".format(msg.error()))
+ logger.critical("traceId-%s Error while PM Get Data from topic :\n%s",str(uuid.uuid4())[:8], msg.error())
+ else:
+ # Extract the (optional) key and value, and print.
+ #print('Received message: {}'.format(msg.value().decode('utf-8')))
+
+ pm_data = msg.value().decode('utf-8')
+
+
+ except Exception as e:
+ logger.critical("traceId-%s Error while Get Data from topic :\n%s",str(uuid.uuid4())[:8], e)
+ pm_data = []
+ except requests.Timeout as error:
+ logger.critical("traceId-%s Timeout from Get Data topic :\n%s",str(uuid.uuid4())[:8], error)
+ pm_data = []
+ return pm_data
+
+ def simulatedTestDataToReplaceTopic(self):
+ """
+ Simulate the test PM data and simulate required time seriess data and performs Predcition.
+ This function help to quickly check Tensorflow the environment,
+ that is used in the prediction and Training process.
+
+ Args:
+ none: none
+ Returns:
+ status: Execution status on predcition task
+
+ Raises:
+ RuntimeError: Error while executing prediction task.
+ """
+
+ #with pd.ExcelFile('test.xlsx', engine="openpyxl") as excel:
+ # df = pd.read_excel(excel)
+ status = True
+
+ try:
+ df = pd.read_excel('tests/unit/test.xlsx', engine='openpyxl')
+
+ new_columns1=[]
+ serverName = self.Config_Object.get_serverName()
+ #new_columns2=[]
+ len_dfcolumns=len(df.columns)
+ for i in range(len_dfcolumns):
+ new_columns1.append('01-B989BD_'+df.columns[i])
+ df.columns=new_columns1
+ slice_name=df.columns[0].split('.')[0]
+ data_df=pd.DataFrame()
+ len_df=len(df)
+ for i in range(len_df-1):
+ temp_df=df.iloc[[i]]
+ data_df=data_df.append(temp_df)
+ # parse pm data + configured data + predicted dummy data(=configured data- to be changed after pred)
+ if len(data_df)<window_size+1:
+ continue
+
+ configured={}
+ predicted={}
+ len_data_dfcol=len(data_df.columns)
+ for x in range(0,len_data_dfcol,window_size+1):
+ test=data_df.iloc[-5:,x:x+5]
+ cell=test.columns[0].split('_')[1]
+ inv_yhat = self.Predict_Object.Predict_Model(test) # Predict using model
+ configured[cell]= test.iat[-2,4]
+ inv_yhat = float(inv_yhat[:,-1])
+ predicted[cell]=inv_yhat
+ updated_predicted= self.Predict_Object.Logic(list(configured.values()), list(predicted.values()))
+ count=0
+ for x in range(0,len_data_dfcol, window_size+1):
+ data_df.iloc[[i],[x+4]]= updated_predicted[count]
+ count+=1
+ ret = self.Predict_Object.Final_Post_Method(predicted, configured, slice_name, serverName)
+ if(ret == False):
+ status = False
+ break
+
+
+ results=data_df
+ logger.info("traceId-%s predicted data: ",str(uuid.uuid4())[:8])
+ logger.debug(predicted)
+ dateTimeObj = datetime.now()
+ timestampStr = dateTimeObj.strftime("%d-%b-%Y-%H-%M-%S")
+ #print('Current Timestamp : ', timestampStr)
+
+
+ results.to_csv(timestampStr+"predicted.csv", index=0)
+ except Exception as e:
+ logger.critical("traceId-%s Error while prediction :\n%s",str(uuid.uuid4())[:8], e)
+ status = False
+ return status
+
+
+ def PreprocessAndPredict(self,pm_data):
+ """
+ Preprocess the data and start the prediction for each slices
+
+ Args:
+ pm_data: Performance Monitoring Data of all slices and cells collection
+ Returns:
+ status: Preprocssing and Prediction status
+
+ Raises:
+ RuntimeError: Error while Preprocessing data during prediction.
+ """
+ status = True
+ try:
+ len_pm_data=len(pm_data)
+ temp_data=json.loads(pm_data[len_pm_data-1])
+ sub_data1 = temp_data['event']['perf3gppFields']['measDataCollection']['measInfoList']
+
+ len_sub_data=len(sub_data1)
+ for r in range(len_sub_data):
+ data_dic1={}
+ server_name = temp_data['event']['perf3gppFields']['measDataCollection']['measuredEntityDn']
+ sub_data = temp_data['event']['perf3gppFields']['measDataCollection']['measInfoList'][r]
+
+ features=sub_data['measTypes']['sMeasTypesList']
+
+ features.extend(['_maxNumberOfConns.configured', '_maxNumberOfConns.predicted'])
+
+ slice_name=features[0].split('.')[2]
+ data_val= sub_data['measValuesList']
+
+ data_dic1= self.Parser_Object.Data_Parser(data_val,data_dic1,features,slice_name)
+
+ #data_df=pd.DataFrame(self.data_dic)
+ data_df=pd.DataFrame(data_dic1)
+
+ predic_res=self.predictionandresults(data_df)
+ except Exception as e:
+ logger.critical("traceId-%s Error while Preprocessing data during prediction :\n%s",str(uuid.uuid4())[:8], e)
+
+ return status
+
+
+
+ def predictionandresults(self,data_df):
+ """
+ Process the data and start the prediction for each cell in slice
+
+ Args:
+ pm_data: Performance Monitoring Data of a slices and cells collection
+ Returns:
+ status: Prediction status
+
+ Raises:
+ RuntimeError: Error while prediction.
+ """
+ status = True
+ try:
+ data_df1=pd.DataFrame()
+ slice_name=data_df.columns[0].split('.')[0]
+ len_data_df=len(data_df)
+ print('lendatadf:',len_data_df)
+ for i in range(len_data_df):
+ temp_df=data_df.iloc[[i]]
+ data_df1=data_df1.append(temp_df)
+ if len(data_df1)<window_size+1:
+ continue
+
+ configured={}
+ predicted={}
+ len_data_dfcol=len(data_df.columns)
+ for x in range(0,len_data_dfcol,window_size+1):
+ test=data_df1.iloc[-5:,x:x+5]
+ cell=test.columns[0].split('_')[1]
+ inv_yhat = self.Predict_Object.Predict_Model(test) # Predict using model
+ configured[cell]= float(test.iat[-2,4])
+ inv_yhat = float(inv_yhat[:,-1])
+ predicted[cell]=inv_yhat
+ #data_df.iloc[[i],[x+4]]=inv_yhat
+ #self.data_dic[data_df.columns[x+4]][-1] = inv_yhat
+ #logger.info("predicted data: "+ predicted)
+ logger.info("traceId-%s SUCCESS predicted data: ",str(uuid.uuid4())[:8])
+ logger.debug( predicted)
+ updated_predicted= self.Predict_Object.Logic(list(configured.values()), list(predicted.values()))
+ #logger.info('updated',updated_predicted)
+ logger.info('traceId-%s SUCCESS updated:', str(uuid.uuid4())[:8])
+ logger.debug(updated_predicted)
+ count=0
+ for x in range(0,len_data_dfcol, window_size+1):
+
+ data_df1.iloc[[i],[x+4]]= updated_predicted[count]
+ count+=1
+
+ self.Predict_Object.Final_Post_Method(predicted, configured, slice_name, server_name)
+
+
+ if len(data_df)>=window_size+1:
+ results=pd.DataFrame(self.data_dic)
+ #print("Predicted Results:",results)
+ dateTimeObj = datetime.now()
+ timestampStr = dateTimeObj.strftime("%d-%b-%Y-%H-%M-%S")
+ #print('Current Timestamp : ', timestampStr)
+ results.to_csv(timestampStr+"predicted.csv", index=0)
+ except Exception as e:
+ logger.critical("traceId-%s Error while Preprocessing data during prediction :\n%s",str(uuid.uuid4())[:8], e)
+ status = False
+
+ return status
+
+
+ def Execute(self):
+ """
+ Executes workflow of task methods to get data from topics, then performs preprocessng and Prediction.
+
+ Args:
+ none: none
+ Returns:
+ none: none
+
+ Raises:
+ RuntimeError: Error during Prediction start process.
+ """
+ status = True
+ bExecute = True
+ pm_data = []
+ try:
+ conf = {'bootstrap.servers': "kafka:9092",'group.id': "1",'auto.offset.reset': 'smallest'}
+
+ consumer = Consumer(conf)
+ consumer.subscribe(([self.Config_Object.get_DataTopic(), -1]))
+
+ while bExecute:
+ #self.StartDataGeneration()
+ #time.sleep(15)
+
+ #self.StopDataGeneration()
+ #time.sleep(5)
+ while True:
+ pm_data = self.GetData(consumer)
+
+ if pm_data==[]:
+ # Delay for 1 minute (60 seconds)
+ #time.sleep(60)
+ break
+
+ self.PreprocessAndPredict(pm_data)
+
+
+ except Exception as e:
+ logger.critical("traceId-%s Error during Prediction start process f:\n%s",str(uuid.uuid4())[:8], e)
+ status = False
+
+ return status
+
+
+if __name__ == "__main__":
+ try:
+
+ time.sleep(60)
+ logger.info("traceId-%s : Start Prediction",str(uuid.uuid4())[:8])
+ Controller_Object = Controller()
+ Controller_Object.Execute()
+ #unit test code
+ #Controller_Object.simulatedTestDataToReplaceTopic()
+ except Exception as e:
+ logger.critical("traceId-%s Error onStart Prediction Process:\n%s",str(uuid.uuid4())[:8], e)
+