# ============LICENSE_START======================================================= # ml-prediction-ms # ================================================================================ # Copyright (C) 2023 Wipro Limited # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= import pytest from src.run import Parser, Prediction, Controller import unittest import requests import responses from unittest import TestCase from unittest import mock from mock import patch # for Python >= 3.3 use unittest.mock import pandas as pd import numpy as np from pandas import DataFrame, read_csv, read_excel from pandas import concat from tensorflow.keras.models import load_model from sklearn.preprocessing import MinMaxScaler from numpy import concatenate from sklearn.metrics import mean_squared_error from math import sqrt from datetime import datetime import json import time import requests from requests.auth import HTTPBasicAuth from confluent_kafka import Consumer from confluent_kafka import Producer import socket import requests_mock from mock import patch # This method will be used by the mock to replace requests.get def mocked_requests_get(*args, **kwargs): class MockResponse: def __init__(self, json_data, status_code): self.json_data = json_data self.status_code = status_code def json(self): return self.json_data return MockResponse({"key1": "value1"}, 200) # Our test case class class ControllerTestCase(unittest.TestCase): # We patch 'requests.get' with our own method. The mock object is passed in to our test case method. @mock.patch('requests.get', side_effect=mocked_requests_get) def test_GetData(self, mock_get): status = True # Assert requests.get calls ctl = Controller() conf = {'bootstrap.servers': "kafka:9092",'group.id': "1",'auto.offset.reset': 'smallest'} consumer = Consumer(conf) consumer.subscribe(([ctl.Config_Object.get_DataTopic(), 1])) # We can even assert that our mocked method was called with the right parameters msg = consumer.poll(1) #json_data = msg.value().decode('utf-8') #assert len(msg) != 0, "the list is non empty" assert status != False def test_simulatedTestDataToReplaceTopic(self): self.Controller_Object = Controller() status = self.Controller_Object.simulatedTestDataToReplaceTopic() assert status != False def test_PreprocessAndPredict(self): ctl = Controller() # Opening JSON file f = open('tests/unit/sample.json',) # returns JSON object as # a dictionary json_data = json.load(f) status = ctl.PreprocessAndPredict(json_data) assert status != False # This method will be used by the mock to replace requests.POST def mocked_requests_post(*args, **kwargs): class MockResponse: def __init__(self, json_data, status_code): self.json_data = json_data self.status_code = status_code def json(self): return self.json_data return MockResponse({"key1": "value1"}, 200) #return MockResponse(None, 404) # Our test case class class PredictionTestCase(unittest.TestCase): # We patch 'requests.get' with our own method. The mock object is passed in to our test case method. @mock.patch('requests.post', side_effect=mocked_requests_post) def test_IsPolicyUpdate_url_Exist(self, mock_post): # Assert requests.post calls pred = Prediction() status = pred.IsPolicyUpdate_url_Exist() assert status == True, "Failed" class TestPredict(unittest.TestCase): def test_Parser(self): Controller_Object = Controller() conf = {'bootstrap.servers': "kafka:9092",'group.id': "1",'auto.offset.reset': 'smallest'} consumer = Consumer(conf) consumer.subscribe(([self.Config_Object.get_DataTopic(), -1])) pm_data = Controller_Object.GetData(consumer) Parser_Object = Parser() data_dic={} status = False len_pm_data=len(pm_data) for i in range(len_pm_data): temp_data=json.loads(pm_data[i]) sub_data = temp_data['event']['perf3gppFields']['measDataCollection']['measInfoList'][0] server_name = temp_data['event']['perf3gppFields']['measDataCollection']['measuredEntityDn'] features=sub_data['measTypes']['sMeasTypesList'] features.extend(['_maxNumberOfConns.configured', '_maxNumberOfConns.predicted']) slice_name=features[0].split('.')[2] data_val= sub_data['measValuesList'] data_dic= Parser_Object.Data_Parser(data_val,data_dic,features,slice_name) data_df=pd.DataFrame(data_dic) if len(data_df)