summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHuabingZhao <zhao.huabing@zte.com.cn>2017-08-28 10:53:35 +0800
committerHuabingZhao <zhao.huabing@zte.com.cn>2017-08-28 11:10:16 +0800
commitc1737d2abac61511e00f388538779d67464b8a98 (patch)
tree7e38a20f6698a6059d046019694b8dc968165283
parent3736aafdb168d76483a42acb552098244ceee034 (diff)
initial codebase for kube2msb
Issue-Id: OOM-61 Change-Id: Ibf70557f1e9277bbe07d8e0e91bf6b125cecb144 Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
-rw-r--r--kube2msb/src/kube2msb.go288
-rw-r--r--kube2msb/src/kube_work.go195
-rw-r--r--kube2msb/src/msb_client.go119
-rw-r--r--kube2msb/src/msb_work.go97
-rw-r--r--kube2msb/src/types.go135
5 files changed, 834 insertions, 0 deletions
diff --git a/kube2msb/src/kube2msb.go b/kube2msb/src/kube2msb.go
new file mode 100644
index 0000000..627405e
--- /dev/null
+++ b/kube2msb/src/kube2msb.go
@@ -0,0 +1,288 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/url"
+ "os"
+ "reflect"
+ "time"
+
+ kapi "k8s.io/kubernetes/pkg/api"
+ kcache "k8s.io/kubernetes/pkg/client/cache"
+ kclient "k8s.io/kubernetes/pkg/client/unversioned"
+ kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
+ kframework "k8s.io/kubernetes/pkg/controller/framework"
+ kselector "k8s.io/kubernetes/pkg/fields"
+ klabels "k8s.io/kubernetes/pkg/labels"
+)
+
+var (
+ argMSBUrl = flag.String("msb_url", "", "URL to MSB backend")
+ argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.")
+ addMap = make(map[string]*kapi.Pod)
+ deleteMap = make(map[string]*kapi.Pod)
+ nodeSelector = klabels.Everything()
+)
+
+const resyncPeriod = 5 * time.Second
+
+func getMSBUrl() (string, error) {
+ if *argMSBUrl == "" {
+ return "", fmt.Errorf("no --msb_url specified")
+ }
+ parsedUrl, err := url.Parse(os.ExpandEnv(*argMSBUrl))
+ if err != nil {
+ return "", fmt.Errorf("failed to parse --msb_url %s - %v", *argMSBUrl, err)
+ }
+ if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
+ return "", fmt.Errorf("invalid --msb_url specified %s", *argMSBUrl)
+ }
+ return parsedUrl.String(), nil
+}
+
+func newMSBClient() (Client, error) {
+ msbUrl, err := getMSBUrl()
+ if err != nil {
+ return nil, err
+ }
+
+ client, err := newMSBAgent(msbUrl)
+ if err != nil {
+ return nil, err
+ }
+ return client, nil
+}
+
+func getKubeMasterUrl() (string, error) {
+ if *argKubeMasterUrl == "" {
+ return "", fmt.Errorf("no --kube_master_url specified")
+ }
+ parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl))
+ if err != nil {
+ return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err)
+ }
+ if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
+ return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl)
+ }
+ return parsedUrl.String(), nil
+}
+
+func newKubeClient() (*kclient.Client, error) {
+ masterUrl, err := getKubeMasterUrl()
+ if err != nil {
+ return nil, err
+ }
+ overrides := &kclientcmd.ConfigOverrides{}
+ overrides.ClusterInfo.Server = masterUrl
+
+ rules := kclientcmd.NewDefaultClientConfigLoadingRules()
+ kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
+
+ if err != nil {
+ log.Println("Error creating Kube Config", err)
+ return nil, err
+ }
+ kubeConfig.Host = masterUrl
+
+ log.Printf("Using %s for kubernetes master", kubeConfig.Host)
+ return kclient.New(kubeConfig)
+}
+
+// Returns a cache.ListWatch that gets all changes to services.
+func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
+ return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything())
+}
+
+func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) {
+ if service, ok := serviceObj.(*kapi.Service); ok {
+ log.Println("Service Action: ", action, " for service ", service.Name)
+ queue <- KubeWork{
+ Action: action,
+ Service: service,
+ }
+ }
+}
+
+func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) {
+ _, svcController := kframework.NewInformer(
+ createServiceLW(kubeClient),
+ &kapi.Service{},
+ resyncPeriod,
+ kframework.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ sendServiceWork(KubeWorkAddService, queue, obj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ sendServiceWork(KubeWorkRemoveService, queue, obj)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ if reflect.DeepEqual(newObj, oldObj) == false {
+ sendServiceWork(KubeWorkUpdateService, queue, newObj)
+ }
+ },
+ },
+ )
+ stop := make(chan struct{})
+ go svcController.Run(stop)
+}
+
+// Returns a cache.ListWatch that gets all changes to Pods.
+func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch {
+ return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything())
+}
+
+// Dispatch the notifications for Pods by type to the worker
+func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) {
+ if pod, ok := podObj.(*kapi.Pod); ok {
+ log.Println("Pod Action: ", action, " for Pod:", pod.Name)
+ queue <- KubeWork{
+ Action: action,
+ Pod: pod,
+ }
+ }
+}
+
+// Launch the go routine to watch notifications for Pods.
+func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) {
+ var podController *kframework.Controller
+ _, podController = kframework.NewInformer(
+ createPodLW(kubeClient),
+ &kapi.Pod{},
+ resyncPeriod,
+ kframework.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ sendPodWork(KubeWorkAddPod, queue, obj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ if o, ok := obj.(*kapi.Pod); ok {
+ if _, ok := deleteMap[o.Name]; ok {
+ delete(deleteMap, o.Name)
+ }
+ }
+ sendPodWork(KubeWorkRemovePod, queue, obj)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod)
+ if reflect.DeepEqual(oldObj, newObj) == false {
+ //Adding Pod
+ if _, ok := addMap[n.Name]; ok {
+ if kapi.IsPodReady(n) {
+ delete(addMap, n.Name)
+ sendPodWork(KubeWorkUpdatePod, queue, newObj)
+ }
+ return
+ }
+ //Deleting Pod
+ if _, ok := deleteMap[n.Name]; ok {
+ return
+ } else {
+ if o.ObjectMeta.DeletionTimestamp == nil &&
+ n.ObjectMeta.DeletionTimestamp != nil {
+ deleteMap[n.Name] = n
+ return
+ }
+ //Updating Pod
+ sendPodWork(KubeWorkUpdatePod, queue, newObj)
+ }
+ }
+ },
+ },
+ )
+ stop := make(chan struct{})
+ go podController.Run(stop)
+}
+
+func runBookKeeper(workQue <-chan KubeWork, msbQueue chan<- MSBWork) {
+
+ client := newClientBookKeeper()
+ client.msbQueue = msbQueue
+
+ for work := range workQue {
+ switch work.Action {
+ case KubeWorkAddService:
+ client.AddService(work.Service)
+ case KubeWorkRemoveService:
+ client.RemoveService(work.Service)
+ case KubeWorkUpdateService:
+ client.UpdateService(work.Service)
+ case KubeWorkAddPod:
+ client.AddPod(work.Pod)
+ case KubeWorkRemovePod:
+ client.RemovePod(work.Pod)
+ case KubeWorkUpdatePod:
+ client.UpdatePod(work.Pod)
+ default:
+ log.Println("Unsupported work action: ", work.Action)
+ }
+ }
+ log.Println("Completed all work")
+}
+
+func runMSBWorker(queue <-chan MSBWork, client Client) {
+ worker := newMSBAgentWorker(client)
+
+ for work := range queue {
+ log.Println("MSB Work Action: ", work.Action, " ServiceInfo:", work.ServiceInfo)
+
+ switch work.Action {
+ case MSBWorkAddService:
+ worker.AddService(work.IPAddress, work.ServiceInfo)
+ case MSBWorkRemoveService:
+ worker.RemoveService(work.IPAddress, work.ServiceInfo)
+ case MSBWorkAddPod:
+ worker.AddPod(work.IPAddress, work.ServiceInfo)
+ case MSBWorkRemovePod:
+ worker.RemovePod(work.IPAddress, work.ServiceInfo)
+ default:
+ log.Println("Unsupported Action of: ", work.Action)
+ }
+ }
+}
+
+func main() {
+ flag.Parse()
+ var err error
+
+ msbClient, err := newMSBClient()
+ if err != nil {
+ log.Fatalf("Failed to create MSB client - %v", err)
+ }
+
+ kubeClient, err := newKubeClient()
+ if err != nil {
+ log.Fatalf("Failed to create a kubernetes client: %v", err)
+ }
+
+ if _, err := kubeClient.ServerVersion(); err != nil {
+ log.Fatal("Could not connect to Kube Master", err)
+ } else {
+ log.Println("Connected to K8S API Server")
+ }
+
+ kubeWorkQueue := make(chan KubeWork)
+ msbWorkQueue := make(chan MSBWork)
+ go runBookKeeper(kubeWorkQueue, msbWorkQueue)
+ watchForServices(kubeClient, kubeWorkQueue)
+ watchForPods(kubeClient, kubeWorkQueue)
+ go runMSBWorker(msbWorkQueue, msbClient)
+
+ // Prevent exit
+ select {}
+}
diff --git a/kube2msb/src/kube_work.go b/kube2msb/src/kube_work.go
new file mode 100644
index 0000000..4e99cbd
--- /dev/null
+++ b/kube2msb/src/kube_work.go
@@ -0,0 +1,195 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "log"
+ "sync"
+
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeBookKeeper interface {
+ AddService(*kapi.Service)
+ RemoveService(*kapi.Service)
+ UpdateService(*kapi.Service)
+ AddPod(*kapi.Pod)
+ RemovePod(*kapi.Pod)
+ UpdatePod(*kapi.Pod)
+}
+
+type ClientBookKeeper struct {
+ sync.Mutex
+ KubeBookKeeper
+ services map[string]*kapi.Service
+ pods map[string]*kapi.Pod
+ msbQueue chan<- MSBWork
+}
+
+func newClientBookKeeper() *ClientBookKeeper {
+ return &ClientBookKeeper{
+ services: make(map[string]*kapi.Service),
+ pods: make(map[string]*kapi.Pod),
+ }
+}
+
+func (client *ClientBookKeeper) AddService(svc *kapi.Service) {
+ client.Lock()
+ defer client.Unlock()
+ if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this ADD notification for service:", svc.Name)
+ return
+ }
+
+ if _, ok := client.services[svc.Name]; ok {
+ log.Printf("service:%s already exist. skip this ADD notification.", svc.Name)
+ return
+ }
+
+ if kapi.IsServiceIPSet(svc) {
+ if svc.Spec.Type == kapi.ServiceTypeClusterIP || svc.Spec.Type == kapi.ServiceTypeNodePort {
+ log.Printf("Adding %s service:%s", svc.Spec.Type, svc.Name)
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkAddService,
+ ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+ IPAddress: svc.Spec.ClusterIP,
+ }
+ } else if svc.Spec.Type == kapi.ServiceTypeLoadBalancer {
+ log.Println("Adding LoadBalancerIP service:", svc.Name)
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkAddService,
+ ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+ IPAddress: svc.Spec.LoadBalancerIP,
+ }
+ } else {
+ log.Printf("Service Type:%s for Service:%s is not supported", svc.Spec.Type, svc.Name)
+ return
+ }
+ client.services[svc.Name] = svc
+ log.Println("Queued Service to be added: ", svc.Name)
+ } else {
+ // if ClusterIP is not set, do not create a DNS records
+ log.Printf("Skipping dns record for headless service: %s\n", svc.Name)
+ }
+}
+
+func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) {
+ client.Lock()
+ defer client.Unlock()
+ if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this Remove notification for service:", svc.Name)
+ return
+ }
+
+ if _, ok := client.services[svc.Name]; !ok {
+ log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name)
+ return
+ }
+
+ if svc.Spec.Type == kapi.ServiceTypeClusterIP || svc.Spec.Type == kapi.ServiceTypeNodePort {
+ log.Printf("Removing %s service:%s", svc.Spec.Type, svc.Name)
+ //Perform All DNS Removes
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkRemoveService,
+ ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+ IPAddress: svc.Spec.ClusterIP,
+ }
+ } else if svc.Spec.Type == kapi.ServiceTypeLoadBalancer {
+ log.Println("Removing LoadBalancerIP service:", svc.Name)
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkRemoveService,
+ ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+ IPAddress: svc.Spec.LoadBalancerIP,
+ }
+ } else {
+ log.Printf("Service Type:%s for Service:%s is not supported", svc.Spec.Type, svc.Name)
+ return
+ }
+ delete(client.services, svc.Name)
+ log.Println("Queued Service to be removed: ", svc.Name)
+}
+
+func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) {
+ if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this Update notification for service:", svc.Name)
+ return
+ }
+
+ client.RemoveService(svc)
+ client.AddService(svc)
+}
+
+func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) {
+ client.Lock()
+ defer client.Unlock()
+ if _, ok := pod.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this ADD notification for pod:", pod.Name)
+ return
+ }
+
+ if _, ok := client.pods[pod.Name]; ok {
+ log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name)
+ return
+ }
+
+ //newly added Pod
+ if pod.Name == "" || pod.Status.PodIP == "" {
+ log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name)
+ addMap[pod.Name] = pod
+ return
+ }
+
+ //Perform All DNS Adds
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkAddPod,
+ ServiceInfo: pod.Annotations[serviceKey],
+ IPAddress: pod.Status.PodIP,
+ }
+ client.pods[pod.Name] = pod
+ log.Println("Queued Pod to be added: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
+ client.Lock()
+ defer client.Unlock()
+ if _, ok := pod.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this Remove notification for pod:", pod.Name)
+ return
+ }
+
+ if _, ok := client.pods[pod.Name]; !ok {
+ log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name)
+ return
+ }
+ //Perform All DNS Removes
+ client.msbQueue <- MSBWork{
+ Action: MSBWorkRemovePod,
+ ServiceInfo: pod.Annotations[serviceKey],
+ IPAddress: pod.Status.PodIP,
+ }
+ delete(client.pods, pod.Name)
+ log.Println("Queued Pod to be removed: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) {
+ if _, ok := pod.Annotations[serviceKey]; !ok {
+ log.Println("Not the target, skip this Update notification for pod:", pod.Name)
+ return
+ }
+
+ client.RemovePod(pod)
+ client.AddPod(pod)
+}
diff --git a/kube2msb/src/msb_client.go b/kube2msb/src/msb_client.go
new file mode 100644
index 0000000..da0557a
--- /dev/null
+++ b/kube2msb/src/msb_client.go
@@ -0,0 +1,119 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "log"
+ "net/http"
+)
+
+const (
+ urlPrefix = "/api/microservices/v1/services"
+)
+
+type Client interface {
+ Register(string)
+ DeRegister(string)
+}
+
+type MSBAgent struct {
+ Client
+ url string
+}
+
+func newMSBAgent(s string) (*MSBAgent, error) {
+ healthCheckURL := s + urlPrefix + "/health"
+ resp, err := http.Get(healthCheckURL)
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("MSB agent:%s is not available", s)
+ }
+
+ return &MSBAgent{url: s}, nil
+}
+
+func (client *MSBAgent) Register(serviceInfo string) {
+ var (
+ sa = &ServiceAnnotation{}
+ )
+ err := json.Unmarshal([]byte(serviceInfo), sa)
+ if err != nil {
+ log.Printf("Failed to Unmarshal serviceInfo to ServiceAnnotation:%v", err)
+ return
+ }
+
+ su := ServiceAnnotation2ServiceUnit(sa)
+ body, _ := json.Marshal(su)
+ postURL := client.url + urlPrefix
+
+ resp, err := http.Post(postURL, "application/json", bytes.NewReader(body))
+ if err != nil {
+ log.Printf("Failed to do a request:%v", err)
+ return
+ }
+
+ log.Printf("Http request to register service:%s returned code:%d", su.Name, resp.StatusCode)
+}
+
+func (client *MSBAgent) DeRegister(serviceInfo string) {
+ var (
+ sa = &ServiceAnnotation{}
+ )
+
+ err := json.Unmarshal([]byte(serviceInfo), sa)
+ if err != nil {
+ log.Printf("Failed to Unmarshal serviceInfo to ServiceAnnotation:%v", err)
+ return
+ }
+
+ deleteURL := client.url + urlPrefix + "/" + sa.ServiceName + "/version/" + sa.Version + "/nodes/" + sa.IP + "/" + sa.Port
+
+ req, err := http.NewRequest("DELETE", deleteURL, nil)
+ if err != nil {
+ log.Printf("(deleteURL:%s) failed to NewRequest:%v", deleteURL, err)
+ return
+ }
+
+ c := &http.Client{}
+ resp, err := c.Do(req)
+ if err != nil {
+ log.Printf("(deleteURL:%s) failed to do a request:%v", deleteURL, err)
+ return
+ }
+ log.Printf("Http request to deregister service:%s returned code:%d", sa.ServiceName, resp.StatusCode)
+}
+
+func ServiceAnnotation2ServiceUnit(sa *ServiceAnnotation) *ServiceUnit {
+ if sa == nil {
+ return nil
+ }
+
+ return &ServiceUnit{
+ Name: sa.ServiceName,
+ Version: sa.Version,
+ URL: sa.URL,
+ Protocol: sa.Protocol,
+ LBPolicy: sa.LBPolicy,
+ VisualRange: sa.VisualRange,
+ Instances: []InstanceUnit{{ServiceIP: sa.IP, ServicePort: sa.Port}},
+ }
+}
diff --git a/kube2msb/src/msb_work.go b/kube2msb/src/msb_work.go
new file mode 100644
index 0000000..5c40bae
--- /dev/null
+++ b/kube2msb/src/msb_work.go
@@ -0,0 +1,97 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "log"
+ "strings"
+ "sync"
+)
+
+type MSBWorker interface {
+ AddService(string, string)
+ RemoveService(string)
+ AddPod(string, string)
+ RemovePod(string)
+}
+
+type MSBAgentWorker struct {
+ sync.Mutex
+ MSBWorker
+ agent Client
+}
+
+func newMSBAgentWorker(client Client) *MSBAgentWorker {
+ return &MSBAgentWorker{
+ agent: client,
+ }
+}
+
+func (client *MSBAgentWorker) AddService(ip, sInfo string) {
+ client.Lock()
+ defer client.Unlock()
+
+ if ip == "" || sInfo == "" {
+ log.Println("Service Info is not valid for AddService")
+ return
+ }
+
+ client.agent.Register(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) RemoveService(ip, sInfo string) {
+ client.Lock()
+ defer client.Unlock()
+
+ if sInfo == "" {
+ log.Println("Service Info is not valid for RemoveService")
+ return
+ }
+
+ client.agent.DeRegister(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) AddPod(ip, sInfo string) {
+ client.Lock()
+ defer client.Unlock()
+ if ip == "" || sInfo == "" {
+ log.Println("Service Info is not valid for AddPod")
+ return
+ }
+
+ client.agent.Register(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) RemovePod(ip, sInfo string) {
+ client.Lock()
+ defer client.Unlock()
+ if sInfo == "" {
+ log.Println("Service Info is not valid for RemovePod")
+ return
+ }
+
+ client.agent.DeRegister(mergeIP(ip, sInfo))
+}
+
+func mergeIP(ip, sInfo string) string {
+ insert := "{\"ip\":\"" + ip + "\","
+ parts := strings.Split(sInfo, "{")
+ out := parts[0]
+ for i := 1; i < len(parts); i++ {
+ out += insert + parts[i]
+ }
+ return out
+}
diff --git a/kube2msb/src/types.go b/kube2msb/src/types.go
new file mode 100644
index 0000000..caaf34a
--- /dev/null
+++ b/kube2msb/src/types.go
@@ -0,0 +1,135 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// types.go
+package main
+
+import (
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeWorkAction string
+
+const (
+ KubeWorkAddService KubeWorkAction = "AddService"
+ KubeWorkRemoveService KubeWorkAction = "RemoveService"
+ KubeWorkUpdateService KubeWorkAction = "UpdateService"
+ KubeWorkAddPod KubeWorkAction = "AddPod"
+ KubeWorkRemovePod KubeWorkAction = "RemovePod"
+ KubeWorkUpdatePod KubeWorkAction = "UpdatePod"
+)
+
+type KubeWork struct {
+ Action KubeWorkAction
+ Service *kapi.Service
+ Pod *kapi.Pod
+}
+
+type MSBWorkAction string
+
+const (
+ MSBWorkAddService MSBWorkAction = "AddService"
+ MSBWorkRemoveService MSBWorkAction = "RemoveService"
+ MSBWorkAddPod MSBWorkAction = "AddPod"
+ MSBWorkRemovePod MSBWorkAction = "RemovePod"
+)
+
+type MSBWork struct {
+ Action MSBWorkAction
+ ServiceInfo string
+ IPAddress string
+}
+
+const serviceKey = "msb.onap.org/service-info"
+
+type ServiceUnit struct {
+ Name string `json:"serviceName,omitempty"`
+ Version string `json:"version"`
+ URL string `json:"url"`
+ Protocol string `json:"protocol"`
+ VisualRange string `json:"visualRange"`
+ LBPolicy string `json:"lb_policy"`
+ PublishPort string `json:"publish_port"`
+ Namespace string `json:"namespace"`
+ NWPlaneType string `json:"network_plane_type"`
+ Host string `json:"host"`
+ SubDomain string `json:"subdomain,omitempty"`
+ Path string `json:"path"`
+ Instances []InstanceUnit `json:"nodes"`
+ Metadata []MetaUnit `json:"metadata"`
+ Labels []string `json:"labels"`
+ SwaggerURL string `json:"swagger_url,omitempty"`
+ IsManual bool `json:"is_manual"`
+ EnableSSL bool `json:"enable_ssl"`
+ EnableTLS bool `json:"enable_tls"`
+ EnableReferMatch string `json:"enable_refer_match"`
+ ProxyRule Rules `json:"proxy_rule,omitempty"`
+ RateLimiting RateLimit `json:"rate_limiting,omitempty"`
+}
+
+type InstanceUnit struct {
+ ServiceIP string `json:"ip,omitempty"`
+ ServicePort string `json:"port,omitempty"`
+ LBServerParams string `json:"lb_server_params,omitempty"`
+ CheckType string `json:"checkType,omitempty"`
+ CheckURL string `json:"checkUrl,omitempty"`
+ CheckInterval string `json:"checkInterval,omitempty"`
+ CheckTTL string `json:"ttl,omitempty"`
+ CheckTimeOut string `json:"checkTimeOut,omitempty"`
+ HaRole string `json:"ha_role,omitempty"`
+ ServiceID string `json:"nodeId,omitempty"`
+ ServiceStatus string `json:"status,omitempty"`
+ APPVersion string `json:"appversion,omitempty"`
+}
+
+type MetaUnit struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+}
+
+type Rules struct {
+ HTTPProxy HTTPProxyRule `json:"http_proxy,omitempty"`
+ StreamProxy StreamProxyRule `json:"stream_proxy,omitempty"`
+}
+
+type HTTPProxyRule struct {
+ SendTimeout string `json:"send_timeout,omitempty"`
+ ReadTimeout string `json:"read_timeout,omitempty"`
+}
+
+type StreamProxyRule struct {
+ ProxyTimeout string `json:"proxy_timeout,omitempty"`
+ ProxyResponse string `json:"proxy_responses,omitempty"`
+}
+
+type RateLimit struct {
+ LimitReq LimitRequest `json:"limit_req,omitempty"`
+}
+
+type LimitRequest struct {
+ Rate string `json:"rate,omitempty"`
+ Burst string `json:"burst,omitempty"`
+}
+
+type ServiceAnnotation struct {
+ IP string `json:"ip,omitempty"`
+ Port string `json:"port,omitempty"`
+ ServiceName string `json:"serviceName,omitempty"`
+ Version string `json:"version,omitempty"`
+ URL string `json:"url,omitempty"`
+ Protocol string `json:"protocol,omitempty"`
+ LBPolicy string `json:"lb_policy,omitempty"`
+ VisualRange string `json:"visualRange,omitempty"`
+}