diff options
Diffstat (limited to 'src/kube2msb/kube2msb.go')
-rw-r--r-- | src/kube2msb/kube2msb.go | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/src/kube2msb/kube2msb.go b/src/kube2msb/kube2msb.go new file mode 100644 index 0000000..627405e --- /dev/null +++ b/src/kube2msb/kube2msb.go @@ -0,0 +1,288 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "flag" + "fmt" + "log" + "net/url" + "os" + "reflect" + "time" + + kapi "k8s.io/kubernetes/pkg/api" + kcache "k8s.io/kubernetes/pkg/client/cache" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + kframework "k8s.io/kubernetes/pkg/controller/framework" + kselector "k8s.io/kubernetes/pkg/fields" + klabels "k8s.io/kubernetes/pkg/labels" +) + +var ( + argMSBUrl = flag.String("msb_url", "", "URL to MSB backend") + argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.") + addMap = make(map[string]*kapi.Pod) + deleteMap = make(map[string]*kapi.Pod) + nodeSelector = klabels.Everything() +) + +const resyncPeriod = 5 * time.Second + +func getMSBUrl() (string, error) { + if *argMSBUrl == "" { + return "", fmt.Errorf("no --msb_url specified") + } + parsedUrl, err := url.Parse(os.ExpandEnv(*argMSBUrl)) + if err != nil { + return "", fmt.Errorf("failed to parse --msb_url %s - %v", *argMSBUrl, err) + } + if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" { + return "", fmt.Errorf("invalid --msb_url specified %s", *argMSBUrl) + } + return parsedUrl.String(), nil +} + +func newMSBClient() (Client, error) { + msbUrl, err := getMSBUrl() + if err != nil { + return nil, err + } + + client, err := newMSBAgent(msbUrl) + if err != nil { + return nil, err + } + return client, nil +} + +func getKubeMasterUrl() (string, error) { + if *argKubeMasterUrl == "" { + return "", fmt.Errorf("no --kube_master_url specified") + } + parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl)) + if err != nil { + return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err) + } + if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" { + return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl) + } + return parsedUrl.String(), nil +} + +func newKubeClient() (*kclient.Client, error) { + masterUrl, err := getKubeMasterUrl() + if err != nil { + return nil, err + } + overrides := &kclientcmd.ConfigOverrides{} + overrides.ClusterInfo.Server = masterUrl + + rules := kclientcmd.NewDefaultClientConfigLoadingRules() + kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() + + if err != nil { + log.Println("Error creating Kube Config", err) + return nil, err + } + kubeConfig.Host = masterUrl + + log.Printf("Using %s for kubernetes master", kubeConfig.Host) + return kclient.New(kubeConfig) +} + +// Returns a cache.ListWatch that gets all changes to services. +func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything()) +} + +func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) { + if service, ok := serviceObj.(*kapi.Service); ok { + log.Println("Service Action: ", action, " for service ", service.Name) + queue <- KubeWork{ + Action: action, + Service: service, + } + } +} + +func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) { + _, svcController := kframework.NewInformer( + createServiceLW(kubeClient), + &kapi.Service{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sendServiceWork(KubeWorkAddService, queue, obj) + }, + DeleteFunc: func(obj interface{}) { + sendServiceWork(KubeWorkRemoveService, queue, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if reflect.DeepEqual(newObj, oldObj) == false { + sendServiceWork(KubeWorkUpdateService, queue, newObj) + } + }, + }, + ) + stop := make(chan struct{}) + go svcController.Run(stop) +} + +// Returns a cache.ListWatch that gets all changes to Pods. +func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything()) +} + +// Dispatch the notifications for Pods by type to the worker +func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) { + if pod, ok := podObj.(*kapi.Pod); ok { + log.Println("Pod Action: ", action, " for Pod:", pod.Name) + queue <- KubeWork{ + Action: action, + Pod: pod, + } + } +} + +// Launch the go routine to watch notifications for Pods. +func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) { + var podController *kframework.Controller + _, podController = kframework.NewInformer( + createPodLW(kubeClient), + &kapi.Pod{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sendPodWork(KubeWorkAddPod, queue, obj) + }, + DeleteFunc: func(obj interface{}) { + if o, ok := obj.(*kapi.Pod); ok { + if _, ok := deleteMap[o.Name]; ok { + delete(deleteMap, o.Name) + } + } + sendPodWork(KubeWorkRemovePod, queue, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod) + if reflect.DeepEqual(oldObj, newObj) == false { + //Adding Pod + if _, ok := addMap[n.Name]; ok { + if kapi.IsPodReady(n) { + delete(addMap, n.Name) + sendPodWork(KubeWorkUpdatePod, queue, newObj) + } + return + } + //Deleting Pod + if _, ok := deleteMap[n.Name]; ok { + return + } else { + if o.ObjectMeta.DeletionTimestamp == nil && + n.ObjectMeta.DeletionTimestamp != nil { + deleteMap[n.Name] = n + return + } + //Updating Pod + sendPodWork(KubeWorkUpdatePod, queue, newObj) + } + } + }, + }, + ) + stop := make(chan struct{}) + go podController.Run(stop) +} + +func runBookKeeper(workQue <-chan KubeWork, msbQueue chan<- MSBWork) { + + client := newClientBookKeeper() + client.msbQueue = msbQueue + + for work := range workQue { + switch work.Action { + case KubeWorkAddService: + client.AddService(work.Service) + case KubeWorkRemoveService: + client.RemoveService(work.Service) + case KubeWorkUpdateService: + client.UpdateService(work.Service) + case KubeWorkAddPod: + client.AddPod(work.Pod) + case KubeWorkRemovePod: + client.RemovePod(work.Pod) + case KubeWorkUpdatePod: + client.UpdatePod(work.Pod) + default: + log.Println("Unsupported work action: ", work.Action) + } + } + log.Println("Completed all work") +} + +func runMSBWorker(queue <-chan MSBWork, client Client) { + worker := newMSBAgentWorker(client) + + for work := range queue { + log.Println("MSB Work Action: ", work.Action, " ServiceInfo:", work.ServiceInfo) + + switch work.Action { + case MSBWorkAddService: + worker.AddService(work.IPAddress, work.ServiceInfo) + case MSBWorkRemoveService: + worker.RemoveService(work.IPAddress, work.ServiceInfo) + case MSBWorkAddPod: + worker.AddPod(work.IPAddress, work.ServiceInfo) + case MSBWorkRemovePod: + worker.RemovePod(work.IPAddress, work.ServiceInfo) + default: + log.Println("Unsupported Action of: ", work.Action) + } + } +} + +func main() { + flag.Parse() + var err error + + msbClient, err := newMSBClient() + if err != nil { + log.Fatalf("Failed to create MSB client - %v", err) + } + + kubeClient, err := newKubeClient() + if err != nil { + log.Fatalf("Failed to create a kubernetes client: %v", err) + } + + if _, err := kubeClient.ServerVersion(); err != nil { + log.Fatal("Could not connect to Kube Master", err) + } else { + log.Println("Connected to K8S API Server") + } + + kubeWorkQueue := make(chan KubeWork) + msbWorkQueue := make(chan MSBWork) + go runBookKeeper(kubeWorkQueue, msbWorkQueue) + watchForServices(kubeClient, kubeWorkQueue) + watchForPods(kubeClient, kubeWorkQueue) + go runMSBWorker(msbWorkQueue, msbClient) + + // Prevent exit + select {} +} |