diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rsync/go.mod | 5 | ||||
-rw-r--r-- | src/rsync/pkg/status/status.go | 139 |
2 files changed, 139 insertions, 5 deletions
diff --git a/src/rsync/go.mod b/src/rsync/go.mod index a2c5f83b..c6831e5e 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -5,12 +5,7 @@ go 1.13 require ( github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 - github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect - github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect - github.com/mattn/go-isatty v0.0.4 // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 - go.etcd.io/bbolt v1.3.3 // indirect google.golang.org/grpc v1.27.1 k8s.io/kubernetes v1.14.1 ) diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go new file mode 100644 index 00000000..28bffefd --- /dev/null +++ b/src/rsync/pkg/status/status.go @@ -0,0 +1,139 @@ +/* + * Copyright 2020 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package status + +import ( + "encoding/json" + "fmt" + "sync" + + pkgerrors "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" + informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +type channelManager struct { + channels map[string]chan struct{} + sync.Mutex +} + +var channelData channelManager + +const monitorLabel = "emco/deployment-id" + +// HandleStatusUpdate for an application in a cluster +// TODO: Add code for specific handling +func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error { + logrus.Info("label::", id) + //status := v.Status.ServiceStatuses + //podStatus := v.Status.PodStatuses + // Store Pod Status in app context + out, _ := json.Marshal(v.Status) + logrus.Info("Status::", string(out)) + return nil +} + +// StartClusterWatcher watches for CR +// configBytes - Kubectl file data +func StartClusterWatcher(provider, name string, configBytes []byte) error { + key := provider + "+" + name + // Get the lock + channelData.Lock() + defer channelData.Unlock() + // For first time + if channelData.channels == nil { + channelData.channels = make(map[string]chan struct{}) + } + _, ok := channelData.channels[key] + if !ok { + // Create Channel + channelData.channels[key] = make(chan struct{}) + // Create config + config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) + if err != nil { + logrus.Info(fmt.Sprintf("RESTConfigFromKubeConfig error: %s", err.Error())) + return pkgerrors.Wrap(err, "RESTConfigFromKubeConfig error") + } + k8sClient, err := clientset.NewForConfig(config) + if err != nil { + return pkgerrors.Wrap(err, "Clientset NewForConfig error") + } + // Create Informer + mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() + go scheduleStatus(provider, name, channelData.channels[key], mInformer) + } + return nil +} + +// StopClusterWatcher stop watching a cluster +func StopClusterWatcher(provider, name string) { + key := provider + "+" + name + if channelData.channels != nil { + c, ok := channelData.channels[key] + if ok { + close(c) + } + } +} + +// CloseAllClusterWatchers close all channels +func CloseAllClusterWatchers() { + if channelData.channels == nil { + return + } + // Close all Channels to stop all watchers + for _, e := range channelData.channels { + close(e) + } +} + +// Per Cluster Go routine to watch CR +func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) { + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(provider, name, l, v) + } + } + }, + UpdateFunc: func(oldObj, obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(provider, name, l, v) + } + } + }, + DeleteFunc: func(obj interface{}) { + // Ignore it + }, + } + s.AddEventHandler(handlers) + s.Run(c) +} |