summaryrefslogtreecommitdiffstats
path: root/helm_deployment_status.py
diff options
context:
space:
mode:
Diffstat (limited to 'helm_deployment_status.py')
-rwxr-xr-xhelm_deployment_status.py91
1 files changed, 56 insertions, 35 deletions
diff --git a/helm_deployment_status.py b/helm_deployment_status.py
index 448263d5..f56935ee 100755
--- a/helm_deployment_status.py
+++ b/helm_deployment_status.py
@@ -33,22 +33,11 @@ from itertools import chain
import csv
from requests.packages.urllib3.exceptions import InsecureRequestWarning
-
def add_resource_kind(resources, kind):
for item in resources:
item['kind'] = kind
return resources
-def get_resources(server, namespace, api, kind, ssl_verify=False):
- url = '/'.join([server, api, 'namespaces', namespace, kind])
- try:
- req = requests.get(url, verify=ssl_verify)
- except requests.exceptions.ConnectionError as err:
- sys.exit('Could not connect to {}'.format(server))
- json = req.json()
- # kind is <resource>List in response so [:-4] removes 'List' from value
- return add_resource_kind(json['items'], json['kind'][:-4])
-
def pods_by_parent(pods, parent):
for pod in pods:
if pod['metadata']['labels']['app'] == parent:
@@ -87,33 +76,29 @@ def analyze_k8s_controllers(resources_data):
return resources
-def get_k8s_controllers(namespace, k8s_url):
+def get_k8s_controllers(k8s):
k8s_controllers = {}
- k8s_controllers['deployments'] = {'data': get_resources(k8s_url, namespace,
+ k8s_controllers['deployments'] = {'data': k8s.get_resources(
'apis/apps/v1', 'deployments')}
- k8s_controllers['deployments'].update(analyze_k8s_controllers(k8s_controllers['deployments']['data']))
+ k8s_controllers['deployments'].update(analyze_k8s_controllers(
+ k8s_controllers['deployments']['data']))
- k8s_controllers['statefulsets'] = {'data': get_resources(k8s_url, namespace,
+ k8s_controllers['statefulsets'] = {'data': k8s.get_resources(
'apis/apps/v1', 'statefulsets')}
- k8s_controllers['statefulsets'].update(analyze_k8s_controllers(k8s_controllers['statefulsets']['data']))
+ k8s_controllers['statefulsets'].update(analyze_k8s_controllers(
+ k8s_controllers['statefulsets']['data']))
- k8s_controllers['jobs'] = {'data': get_resources(k8s_url, namespace,
+ k8s_controllers['jobs'] = {'data': k8s.get_resources(
'apis/batch/v1', 'jobs')}
- k8s_controllers['jobs'].update(analyze_k8s_controllers(k8s_controllers['jobs']['data']))
+ k8s_controllers['jobs'].update(analyze_k8s_controllers(
+ k8s_controllers['jobs']['data']))
not_ready_controllers = chain.from_iterable(
k8s_controllers[x]['not_ready_list'] for x in k8s_controllers)
return k8s_controllers, list(not_ready_controllers)
-def get_k8s_url(kube_config):
- # TODO: Get login info
- with open(kube_config) as f:
- config = yaml.load(f)
- # TODO: Support cluster by name
- return config['clusters'][0]['cluster']['server']
-
def exec_healthcheck(hp_script, namespace):
try:
hc = subprocess.check_output(
@@ -123,12 +108,12 @@ def exec_healthcheck(hp_script, namespace):
except subprocess.CalledProcessError as err:
return err.returncode, err.output
-def check_readiness(k8s_url, namespace, verbosity):
- k8s_controllers, not_ready_controllers = get_k8s_controllers(namespace, k8s_url)
+def check_readiness(k8s, verbosity):
+ k8s_controllers, not_ready_controllers = get_k8s_controllers(k8s)
# check pods only when it is explicitly wanted (judging readiness by deployment status)
if verbosity > 1:
- pods = get_resources(k8s_url, namespace, 'api/v1', 'pods')
+ pods = k8s.get_resources('api/v1', 'pods')
unready_pods = chain.from_iterable(
get_names(not_ready_pods(
pods_by_parent(pods, x)))
@@ -139,11 +124,11 @@ def check_readiness(k8s_url, namespace, verbosity):
print_status(verbosity, k8s_controllers, unready_pods)
return not not_ready_controllers
-def check_in_loop(k8s_url, namespace, max_time, sleep_time, verbosity):
+def check_in_loop(k8s, max_time, sleep_time, verbosity):
max_end_time = datetime.datetime.now() + datetime.timedelta(minutes=max_time)
ready = False
while datetime.datetime.now() < max_end_time:
- ready = check_readiness(k8s_url, namespace, verbosity)
+ ready = check_readiness(k8s, verbosity)
if ready:
return ready
sleep(sleep_time)
@@ -204,6 +189,43 @@ def parse_args():
return parser.parse_args()
+class Kubernetes:
+ '''Class exposing get_resources() routine for connecting to kube API.
+ It keeps all attributes required by that call as an internal
+ object state.'''
+
+ requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+
+ def __init__(self,args):
+
+ self.config = args.kubeconfig
+ self.url = args.server if args.server is not None else \
+ self._get_k8s_url()
+ self.namespace = args.namespace
+
+ def get_resources(self, api, kind):
+ '''Performs actual API call'''
+ url = '/'.join([self.url, api, 'namespaces', self.namespace, kind])
+ try:
+ req = requests.get(url, verify=False)
+ except requests.exceptions.ConnectionError as err:
+ sys.exit('Error: Could not connect to {}'.format(self.url))
+ if req.status_code == 200:
+ json = req.json()
+ # kind is <resource>List in response so [:-4] removes 'List' from value
+ return add_resource_kind(json['items'], json['kind'][:-4])
+ elif (req.status_code == 401):
+ sys.exit('Error: Server replied with "401 Unauthorized" while making connection')
+ else:
+ sys.exit("Error: There's been an unspecified issue while making a request to the API")
+
+ def _get_k8s_url(self):
+ # TODO: Get login info
+ with open(self.config) as f:
+ config = yaml.load(f)
+ # TODO: Support cluster by name
+ return config['clusters'][0]['cluster']['server']
+
def main():
args = parse_args()
@@ -218,16 +240,15 @@ def main():
except IOError as err:
sys.exit(err.strerror)
- requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
- k8s_url = args.server if args.server is not None else get_k8s_url(args.kubeconfig)
+ k8s = Kubernetes(args)
ready = False
if args.single_run:
- ready = check_readiness(k8s_url, args.namespace, args.verbosity)
+ ready = check_readiness(k8s, args.verbosity)
else:
- if not check_in_loop(k8s_url, args.namespace, args.max_time, args.check_frequency, args.verbosity):
+ if not check_in_loop(k8s, args.max_time, args.check_frequency, args.verbosity):
# Double-check last 5 minutes and write verbosely in case it is not ready
- ready = check_readiness(k8s_url, args.namespace, 2)
+ ready = check_readiness(k8s, 2)
if args.health_path is not None:
try: