1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#!/usr/bin/env python3
import k8s_bin_versions_inspector as kbvi
import kubernetes
def exec_sync_post_namespaced_pod_exec(pod, command):
kubernetes.config.load_kube_config()
api = kubernetes.client.CoreV1Api()
containers = kbvi.list_all_containers(api, "")
container = next(c for c in containers if c.pod.startswith(pod))
result = kbvi.sync_post_namespaced_pod_exec(api, container, command)
return result
def test_sync_post_namespaced_pod_exec():
pod = "kbvi-test-python-jupyter"
result = exec_sync_post_namespaced_pod_exec(pod, "id")
assert result == {
"stdout": "uid=1000(jovyan) gid=100(users) groups=100(users)\n",
"stderr": "",
"error": {"status": "Success", "metadata": {}},
"code": 0,
}
def test_sync_post_namespaced_pod_exec_not_running():
pod = "kbvi-test-terminated"
result = exec_sync_post_namespaced_pod_exec(pod, "id")
assert result == {"stdout": "", "stderr": "", "error": {}, "code": -1}
def test_sync_post_namespaced_pod_exec_not_found():
pod = "kbvi-test-python-jupyter"
command = "/command/not/found"
result = exec_sync_post_namespaced_pod_exec(pod, command)
assert result["stdout"] == ""
assert result["stderr"] == ""
assert result["error"]["status"] == "Failure"
assert result["error"]["reason"] == "InternalError"
assert result["code"] == -2
def test_sync_post_namespaced_pod_exec_exit_code():
pod = "kbvi-test-python-jupyter"
command = ["python3", "--invalid-attribute"]
result = exec_sync_post_namespaced_pod_exec(pod, command)
assert result == {
"stdout": "",
"stderr": "unknown option --invalid-attribute\n"
"usage: python3 [option] ... [-c cmd | -m mod | file | -] [arg] ...\n"
"Try `python -h' for more information.\n",
"error": {
"status": "Failure",
"reason": "NonZeroExitCode",
"message": "command terminated with non-zero exit code: error "
"executing command [python3 --invalid-attribute], exit code 2",
"details": {"causes": [{"message": "2", "reason": "ExitCode"}]},
"metadata": {},
},
"code": 2,
}
def test_sync_post_namespaced_pod_exec_stderr():
pod = "kbvi-test-python-stderr-filebeat"
command = ["python", "--version"]
result = exec_sync_post_namespaced_pod_exec(pod, command)
assert result == {
"stdout": "",
"stderr": "Python 2.7.5\n",
"error": {"status": "Success", "metadata": {}},
"code": 0,
}
|