summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/kube2msb/kube2msb_test.go95
-rw-r--r--src/kube2msb/kube_work.go6
-rw-r--r--src/kube2msb/kube_work_test.go28
3 files changed, 112 insertions, 17 deletions
diff --git a/src/kube2msb/kube2msb_test.go b/src/kube2msb/kube2msb_test.go
index eea0f92..63efd8e 100644
--- a/src/kube2msb/kube2msb_test.go
+++ b/src/kube2msb/kube2msb_test.go
@@ -110,3 +110,98 @@ func TestSendPodWork(t *testing.T) {
}
}
}
+
+func TestRunBookKeeper(t *testing.T) {
+ kubeWorkQueue := make(chan KubeWork)
+ msbWorkQueue := make(chan MSBWork)
+
+ go runBookKeeper(kubeWorkQueue, msbWorkQueue)
+
+ serviceCases := []struct {
+ work KubeWork
+ ip string
+ msbAction MSBWorkAction
+ }{
+ {
+ KubeWork{
+ Action: KubeWorkAddService,
+ Service: createMockService("RunBookKeeper", "127.0.0.1", kapi.ServiceTypeClusterIP),
+ },
+ "127.0.0.1",
+ MSBWorkAddService,
+ },
+ {
+ KubeWork{
+ Action: KubeWorkUpdateService,
+ Service: createMockService("RunBookKeeper", "127.0.0.2", kapi.ServiceTypeNodePort),
+ },
+ "127.0.0.2",
+ MSBWorkAddService,
+ },
+ {
+ KubeWork{
+ Action: KubeWorkRemoveService,
+ Service: createMockService("RunBookKeeper", "127.0.0.3", kapi.ServiceTypeLoadBalancer),
+ },
+ "127.0.0.3",
+ MSBWorkRemoveService,
+ },
+ }
+
+ for _, c := range serviceCases {
+ // if c.work.Service.Spec.Type == kapi.ServiceTypeLoadBalancer {
+ // c.work.Service.Spec.LoadBalancerIP = "127.0.0.4"
+ // c.ip = "127.0.0.4"
+ // }
+ kubeWorkQueue <- c.work
+
+ if c.work.Action == KubeWorkUpdateService {
+ msbWorkValidate(t, msbWorkQueue, c.work.Service, MSBWorkRemoveService, c.ip)
+ msbWorkValidate(t, msbWorkQueue, c.work.Service, MSBWorkAddService, c.ip)
+ } else {
+ msbWorkValidate(t, msbWorkQueue, c.work.Service, c.msbAction, c.ip)
+ }
+ }
+
+ podCases := []struct {
+ work KubeWork
+ msbAction MSBWorkAction
+ ip string
+ }{
+ {
+ KubeWork{
+ Action: KubeWorkAddPod,
+ Pod: createMockPod("RunBookKeeper", "192.168.1.2"),
+ },
+ MSBWorkAddPod,
+ "192.168.1.2",
+ },
+ {
+ KubeWork{
+ Action: KubeWorkUpdatePod,
+ Pod: createMockPod("RunBookKeeper", "192.168.1.3"),
+ },
+ MSBWorkAddPod,
+ "",
+ },
+ {
+ KubeWork{
+ Action: KubeWorkRemovePod,
+ Pod: createMockPod("RunBookKeeper", "192.168.1.4"),
+ },
+ MSBWorkRemovePod,
+ "192.168.1.3",
+ },
+ }
+
+ for _, c := range podCases {
+ kubeWorkQueue <- c.work
+
+ if c.work.Action == KubeWorkUpdatePod {
+ msbWorkPodValidate(t, msbWorkQueue, c.work.Pod, MSBWorkRemovePod, "192.168.1.2")
+ msbWorkPodValidate(t, msbWorkQueue, c.work.Pod, MSBWorkAddPod, "192.168.1.3")
+ } else {
+ msbWorkPodValidate(t, msbWorkQueue, c.work.Pod, c.msbAction, c.ip)
+ }
+ }
+}
diff --git a/src/kube2msb/kube_work.go b/src/kube2msb/kube_work.go
index b7965b5..8e15671 100644
--- a/src/kube2msb/kube_work.go
+++ b/src/kube2msb/kube_work.go
@@ -105,7 +105,7 @@ func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) {
client.msbQueue <- MSBWork{
Action: MSBWorkRemoveService,
ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
- IPAddress: svc.Spec.LoadBalancerIP,
+ IPAddress: svc.Spec.ClusterIP,
}
} else {
log.Printf("Service Type:%s for Service:%s is not supported", svc.Spec.Type, svc.Name)
@@ -152,7 +152,7 @@ func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) {
IPAddress: pod.Status.PodIP,
}
client.pods[pod.Name] = pod
- log.Println("Queued Pod to be added: ", pod.Name)
+ log.Println("Queued Pod to be added: ", pod.Name, pod.Status.PodIP)
}
func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
@@ -173,8 +173,8 @@ func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
ServiceInfo: pod.Annotations[serviceKey],
IPAddress: client.pods[pod.Name].Status.PodIP,
}
+ log.Println("Queued Pod to be removed: ", pod.Name, client.pods[pod.Name].Status.PodIP)
delete(client.pods, pod.Name)
- log.Println("Queued Pod to be removed: ", pod.Name)
}
func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) {
diff --git a/src/kube2msb/kube_work_test.go b/src/kube2msb/kube_work_test.go
index 43ae17e..08c85b9 100644
--- a/src/kube2msb/kube_work_test.go
+++ b/src/kube2msb/kube_work_test.go
@@ -174,12 +174,12 @@ func createMockPod(name string, ip string) *kapi.Pod {
return &pod
}
-func msbWorkPodValidate(t *testing.T, queue <-chan MSBWork, pod *kapi.Pod, action MSBWorkAction) {
+func msbWorkPodValidate(t *testing.T, queue <-chan MSBWork, pod *kapi.Pod, action MSBWorkAction, ip string) {
work := <-queue
- if work.Action != action || work.IPAddress != pod.Status.PodIP || work.ServiceInfo != pod.Name {
- t.Errorf("expect %s,%s,%s to be %s %s,%s",
- work.Action, work.IPAddress, work.ServiceInfo, action, pod.Status.PodIP, pod.Name)
+ if work.Action != action || work.IPAddress != ip || work.ServiceInfo != pod.Name {
+ t.Errorf("expect %s,%s,%s to be %s, %s,%s",
+ work.Action, work.IPAddress, work.ServiceInfo, action, ip, pod.Name)
}
}
@@ -191,7 +191,7 @@ func TestAddPodKube(t *testing.T) {
// add ServiceTypeClusterIP
pod := createMockPod("addPodTest", "192.168.10.10")
client.AddPod(pod)
- msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkAddPod)
+ msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkAddPod, "192.168.10.10")
if _, ok := client.pods[pod.Name]; !ok {
t.Errorf("add pod error, pod not exists in client.pods")
}
@@ -239,9 +239,9 @@ func TestRemovePodKube(t *testing.T) {
if _, ok := client.pods[pod.Name]; !ok {
t.Errorf("add pod error, pod not exists in client.pods")
}
- msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkAddPod)
+ msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkAddPod, "192.168.10.10")
client.RemovePod(pod)
- msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkRemovePod)
+ msbWorkPodValidate(t, msbWorkQueue, pod, MSBWorkRemovePod, "192.168.10.10")
if _, ok := client.pods[pod.Name]; ok {
t.Errorf("remove pod error, pod still exists in client.pods")
}
@@ -261,16 +261,16 @@ func TestUpdatePodKube(t *testing.T) {
// update exist Pod
existPod := createMockPod("mockPod", "192.168.10.11")
client.AddPod(existPod)
- msbWorkPodValidate(t, msbWorkQueue, existPod, MSBWorkAddPod)
+ msbWorkPodValidate(t, msbWorkQueue, existPod, MSBWorkAddPod, "192.168.10.11")
if _, ok := client.pods[existPod.Name]; !ok {
t.Errorf("add pod error, pod not exists in client.pods")
}
// update service info
- existPod.Status.PodIP = "0.0.0.0"
- client.UpdatePod(existPod)
- msbWorkPodValidate(t, msbWorkQueue, existPod, MSBWorkRemovePod)
- msbWorkPodValidate(t, msbWorkQueue, existPod, MSBWorkAddPod)
- if updatedExistPod, ok := client.pods[existPod.Name]; !ok || updatedExistPod.Status.PodIP != existPod.Status.PodIP {
+ updatePod := createMockPod("mockPod", "0.0.0.0")
+ client.UpdatePod(updatePod)
+ msbWorkPodValidate(t, msbWorkQueue, updatePod, MSBWorkRemovePod, "192.168.10.11")
+ msbWorkPodValidate(t, msbWorkQueue, updatePod, MSBWorkAddPod, "0.0.0.0")
+ if updatedExistPod, ok := client.pods[existPod.Name]; !ok || updatedExistPod.Status.PodIP != updatePod.Status.PodIP {
t.Errorf("add pod error, pod not exists in client.pods")
}
@@ -280,7 +280,7 @@ func TestUpdatePodKube(t *testing.T) {
t.Errorf("mockNotExistPod should not exist before it has been added")
}
client.UpdatePod(notExistPod)
- msbWorkPodValidate(t, msbWorkQueue, notExistPod, MSBWorkAddPod)
+ msbWorkPodValidate(t, msbWorkQueue, notExistPod, MSBWorkAddPod, "192.168.10.11")
if _, ok := client.pods[notExistPod.Name]; !ok {
t.Errorf("mockNotExistPod should not exist before it has been added")
}