aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud/resources.py
blob: 4509dd43961d31dd30231b7d820b48d8cbdf4503 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Resources module."""


class K8sResource():
    """K8sResource class."""

    def __init__(self, k8s=None):
        """Init the k8s resource."""
        self.k8s = k8s
        self.name = ""
        self.events = []
        if self.k8s:
            self.name = self.k8s.metadata.name
            self.specific_k8s_init()

    def specific_k8s_init(self):
        """Do the specific part for k8s resource when k8s object is present."""
        pass

    def __repr__(self):
        return self.name

    def __str__(self):
        return self.name

    def __eq__(self, other):
        if (isinstance(other, K8sResource)):
            return self.name == other.name
        else:
            return False

class K8sPodParentResource(K8sResource):
    """K8sPodParentResource class."""

    def __init__(self, k8s=None):
        """Init the k8s pod parent resource."""
        self.pods = []
        self.failed_pods = 0
        super().__init__(k8s=k8s)


class Pod(K8sResource):
    """Pod class."""

    def __init__(self, k8s=None):
        """Init the pod."""
        self.containers = []
        self.init_containers = []
        self.running_containers = 0
        self.runned_init_containers = 0
        self.volumes = {}
        self.restart_count = 0
        self.init_restart_count = 0
        self.init_done = True
        super().__init__(k8s=k8s)

    def specific_k8s_init(self):
        """Specific k8s init."""
        self.set_volumes(self.k8s.spec.volumes)

    def set_volumes(self, volumes):
        """Generate the volume list."""
        if volumes is None:
            return
        for volume in volumes:
            volume_name = volume.name
            self.volumes[volume_name] = {}
            for volume_type in volume.attribute_map:
                if volume_type != "name" and getattr(volume, volume_type):
                    self._parse_volume_type(volume, volume_name, volume_type)

    def _parse_volume_type(self, volume, name, volume_type):
        """Parse volume type informations."""
        self.volumes[name][volume_type] = {}
        infos = getattr(volume, volume_type)
        for details in infos.attribute_map:
            self.volumes[name][volume_type][details] = getattr(infos, details)

    def ready(self):
        """Calculate if Pod is ready."""
        if self.init_done and self.running_containers == len(self.containers):
            return True
        return False


class Container():
    """Container class."""

    def __init__(self, name=""):
        """Init the container."""
        self.name = name
        self.status = ""
        self.ready = False
        self.restart_count = 0
        self.image = ""

    def set_status(self, status):
        """Generate status for container."""
        if status.running:
            self.status = "Running"
        else:
            if status.terminated:
                self.status = "Terminated ({})".format(
                    status.terminated.reason)
            else:
                if status.waiting:
                    self.status = "Waiting ({})".format(
                        status.waiting.reason)
                else:
                    self.status = "Unknown"


class Service(K8sPodParentResource):
    """Service class."""

    def __init__(self, k8s=None):
        """Init the service."""
        self.type = ""
        super().__init__(k8s=k8s)

    def specific_k8s_init(self):
        """Do the specific part for service when k8s object is present."""
        self.type = self.k8s.spec.type


class Job(K8sPodParentResource):
    """Job class."""


class Deployment(K8sPodParentResource):
    """Deployment class."""

class ReplicaSet(K8sPodParentResource):
    """ReplicaSet class."""

class StatefulSet(K8sPodParentResource):
    """StatefulSet class."""


class DaemonSet(K8sPodParentResource):
    """DaemonSet class."""


class Pvc(K8sResource):
    """Pvc class."""


class ConfigMap(K8sResource):
    """ConfigMap class."""


class Secret(K8sResource):
    """Secret class."""


class Ingress(K8sResource):
    """Ingress class."""