aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/simulator/pnf/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/simulator/pnf/utils.py')
-rw-r--r--src/onaptests/steps/simulator/pnf/utils.py157
1 files changed, 0 insertions, 157 deletions
diff --git a/src/onaptests/steps/simulator/pnf/utils.py b/src/onaptests/steps/simulator/pnf/utils.py
deleted file mode 100644
index efac46e..0000000
--- a/src/onaptests/steps/simulator/pnf/utils.py
+++ /dev/null
@@ -1,157 +0,0 @@
-"""Utility functions that invoke operations of simulator script."""
-import os
-import sys
-import time
-import urllib.parse
-import yaml
-from ipaddress import ip_address
-from typing import Dict, Optional
-from decorator import decorator
-import docker
-
-from onapsdk.configuration import settings
-from onaptests.masspnfsimulator.MassPnfSim import (
- MassPnfSim, get_parser
-)
-
-def get_config() -> Dict:
- """Read a config YAML file."""
- config = None
- dir_path = os.path.dirname(os.path.realpath(__file__))
- with open(f"{dir_path}/pnf_config.yaml", "r") as ymlfile:
- config = yaml.load(ymlfile)
- return config
-
-def get_default_args() -> None:
- """Prepare default arguments for required operations.
-
- Returns:
- args (argparse.Namespace): default arguments.
-
- """
- parser = get_parser()
- args = parser.parse_args('')
- return args
-
-def switch_workdir(back_pwd: str = None) -> Optional[str]:
- """Switch work directory temporarily for PNF simulator operations.
-
- When `back_pwd` path is provided, it means go back tp the repository
- you came from.
-
- Arguments:
- back_pwd: path to go back to.
-
- Returns:
- old_pwd (str): previous path.
-
- """
- sim_file_path = sys.modules[MassPnfSim.__module__].__file__
- sim_dir_path = os.path.dirname(sim_file_path)
-
- old_pwd = os.getcwd()
-
- if not back_pwd:
- curr_pwd = sim_dir_path
- else:
- curr_pwd = back_pwd
-
- os.chdir(curr_pwd)
- return old_pwd
-
-@decorator
-def chdir(func, *args, **kwargs):
- """Switches to and from the simulator workspace."""
- old_pwd = switch_workdir()
- ret = func(*args, **kwargs)
- switch_workdir(old_pwd)
- return ret
-
-@chdir
-def build_image() -> None:
- """Build simulator image."""
- build = getattr(MassPnfSim(), "build")
- args = get_default_args()
- build(args)
-
-@chdir
-def remove_image() -> None:
- """Remove simulator image(s)."""
- client = docker.from_env()
- sim_image_name = "nexus3.onap.org:10003/onap/masspnf-simulator"
- images = client.images.list(sim_image_name)
- for obj in images:
- client.images.remove(obj.id, force=True)
-
-@chdir
-def bootstrap_simulator() -> None:
- """Setup simulator(s) repo, data and configs."""
- args = get_default_args()
- config = get_config()
-
- # collect settings that will be placed in the simulator directory
- vesprotocol = config["setup"].get('vesprotocol', "http")
- ves_url = urllib.parse.urlparse(settings.VES_URL)
- vesip = ves_url.hostname
- vesport = ves_url.port
- vesresource = config["setup"].get('vesresource', "")
- vesversion = config["setup"].get('vesversion', "")
-
- urlves = f"{vesprotocol}://{vesip}:{vesport}/{vesresource}/{vesversion}"
-
- # assign to simulator's arguments
- args.count = config["setup"].get('count', 1)
- args.urlves = urlves
- args.ipstart = ip_address(config["setup"].get('ipstart', ''))
- args.ipfileserver = config["setup"].get('ipfileserver', '')
- args.typefileserver = config["setup"].get('typefileserver', '')
- args.user = config["setup"].get('user', '')
- args.password = config["setup"].get('password', '')
-
- # bootstrap with assigned arguments
- bootstrap = getattr(MassPnfSim(), "bootstrap")
- bootstrap(args)
-
-@chdir
-def run_container() -> None:
- """Run simulator container(s)."""
- start = getattr(MassPnfSim(), "start")
- args = get_default_args()
- start(args)
-
-@chdir
-def register() -> None:
- """Send an event to VES.
-
- Use time.sleep(seconds) if registering with VES right after run_container().
- Containers take a few seconds to run properly. Normally 5 seconds should be
- enough.
-
- """
- time.sleep(5)
- config = get_config()
- trigger = getattr(MassPnfSim(), "trigger")
- args = get_default_args()
-
- args.user = config['setup'].get('user', '')
- args.password = config['setup'].get('password', '')
-
- custom_data = config['data']
- if custom_data:
- args.data = custom_data
-
- trigger(args)
-
-@chdir
-def stop_container() -> None:
- """Stop simulator container(s)."""
- stop = getattr(MassPnfSim(), "stop")
- args = get_default_args()
- stop(args)
-
-@chdir
-def remove_simulator() -> None:
- """Remove simulator container(s)."""
- clean = getattr(MassPnfSim(), "clean")
- args = get_default_args()
- clean(args)