aboutsummaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal
diff options
context:
space:
mode:
authorLukasz Rajewski <lukasz.rajewski@orange.com>2022-02-15 22:39:37 +0100
committerLukasz Rajewski <lukasz.rajewski@orange.com>2022-03-02 22:46:03 +0100
commit5b18db4fc784763402e0898bf5e996886279347e (patch)
tree984a315638e1ef87841144fbb6a7e56484ffd12c /src/k8splugin/internal
parenta73b42b9c3877f1a34939d85941482f7f5c44db9 (diff)
Implementation of status notification mechanism0.10.0
- Subscription CRUD endpoints - Subscription notifu executor - Cleanup of subscriptions on instance delete - Sending notification to the specified callback Issue-ID: MULTICLOUD-1445 Signed-off-by: Lukasz Rajewski <lukasz.rajewski@orange.com> Change-Id: I5b867a348e916f6c2c471bcc5326c831d832f45e
Diffstat (limited to 'src/k8splugin/internal')
-rw-r--r--src/k8splugin/internal/app/client.go17
-rw-r--r--src/k8splugin/internal/app/instance.go31
-rw-r--r--src/k8splugin/internal/app/subscription.go752
3 files changed, 790 insertions, 10 deletions
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go
index 3aabda22..cbd3dd56 100644
--- a/src/k8splugin/internal/app/client.go
+++ b/src/k8splugin/internal/app/client.go
@@ -28,6 +28,7 @@ import (
//appsv1beta2 "k8s.io/api/apps/v1beta2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
+ v1 "k8s.io/api/core/v1"
//extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
//apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -53,9 +54,11 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/dynamic"
+ "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
+ "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
@@ -640,3 +643,17 @@ func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
func (k *KubernetesClient) GetInstanceID() string {
return k.instanceID
}
+
+func (k *KubernetesClient) GetInformer(gvk schema.GroupVersionKind) (cache.SharedInformer, error) {
+ labelOptions := dynamicinformer.TweakListOptionsFunc(func(opts *metav1.ListOptions) {
+ opts.LabelSelector = config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID
+ })
+
+ factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.GetDynamicClient(), 0, v1.NamespaceAll, labelOptions)
+ mapping, err := k.GetMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
+ if err != nil {
+ return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
+ }
+ informer := factory.ForResource(mapping.Resource).Informer()
+ return informer, nil
+}
diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go
index e78eea77..91e2150e 100644
--- a/src/k8splugin/internal/app/instance.go
+++ b/src/k8splugin/internal/app/instance.go
@@ -121,7 +121,7 @@ type InstanceManager interface {
Upgrade(id string, u UpgradeRequest) (InstanceResponse, error)
Get(id string) (InstanceResponse, error)
GetFull(id string) (InstanceDbData, error)
- Status(id string) (InstanceStatus, error)
+ Status(id string, checkReady bool) (InstanceStatus, error)
Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error)
List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error)
Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error)
@@ -813,7 +813,7 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta
}
// Status returns the status for the instance
-func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
+func (v *InstanceClient) Status(id string, checkReady bool) (InstanceStatus, error) {
//Read the status from the DB
key := InstanceKey{
ID: id,
@@ -867,12 +867,14 @@ Main:
isReady = false
} else {
generalStatus = append(generalStatus, status)
- ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
+ if checkReady {
+ ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
- if !ready || err != nil {
- isReady = false
- if err != nil {
- cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+ if !ready || err != nil {
+ isReady = false
+ if err != nil {
+ cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+ }
}
}
}
@@ -905,7 +907,7 @@ Main:
resp := InstanceStatus{
Request: resResp.Request,
ResourceCount: int32(len(generalStatus)),
- Ready: isReady && resResp.Status == "DONE",
+ Ready: checkReady && isReady && resResp.Status == "DONE",
ResourcesStatus: generalStatus,
}
@@ -914,7 +916,6 @@ Main:
strings.Join(cumulatedErrorMsg, "\n"))
return resp, err
}
- //TODO Filter response content by requested verbosity (brief, ...)?
return resp, nil
}
@@ -925,7 +926,6 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K
defer cancel()
apiVersion, kind := rss.GVK.ToAPIVersionAndKind()
- log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
var parsedRes runtime.Object
//TODO: Should we care about different api version for a same kind?
@@ -1143,6 +1143,12 @@ func (v *InstanceClient) Delete(id string) error {
}
}()
} else {
+ subscriptionClient := NewInstanceStatusSubClient()
+ err = subscriptionClient.Cleanup(id)
+ if err != nil {
+ log.Printf(err.Error())
+ }
+
err = db.DBconn.Delete(v.storeName, key, v.tagInst)
if err != nil {
return pkgerrors.Wrap(err, "Delete Instance")
@@ -1288,6 +1294,11 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
return pkgerrors.Wrap(err, "Error running post-delete hooks")
}
if clearDb {
+ subscriptionClient := NewInstanceStatusSubClient()
+ err = subscriptionClient.Cleanup(instance.ID)
+ if err != nil {
+ log.Printf(err.Error())
+ }
err = db.DBconn.Delete(v.storeName, key, v.tagInst)
if err != nil {
log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName)
diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go
new file mode 100644
index 00000000..9b4a1aaf
--- /dev/null
+++ b/src/k8splugin/internal/app/subscription.go
@@ -0,0 +1,752 @@
+/*
+ * Copyright © 2022 Orange
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+ "bytes"
+ "encoding/json"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
+ pkgerrors "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/tools/cache"
+)
+
+// QueryStatus is what is returned when status is queried for an instance
+type StatusSubscription struct {
+ Name string `json:"name"`
+ MinNotifyInterval int32 `json:"min-notify-interval"`
+ LastUpdateTime time.Time `json:"last-update-time"`
+ CallbackUrl string `json:"callback-url"`
+ LastNotifyTime time.Time `json:"last-notify-time"`
+ LastNotifyStatus int32 `json:"last-notify-status"`
+ NotifyMetadata map[string]interface{} `json:"metadata"`
+}
+
+type SubscriptionRequest struct {
+ Name string `json:"name"`
+ MinNotifyInterval int32 `json:"min-notify-interval"`
+ NotifyMetadata map[string]interface{} `json:"metadata"`
+ CallbackUrl string `json:"callback-url"`
+}
+
+// StatusSubscriptionKey is used as the primary key in the db
+type StatusSubscriptionKey struct {
+ InstanceId string `json:"instanceid"`
+ SubscriptionName string `json:"subscriptionname"`
+}
+
+// We will use json marshalling to convert to string to
+// preserve the underlying structure.
+func (dk StatusSubscriptionKey) String() string {
+ out, err := json.Marshal(dk)
+ if err != nil {
+ return ""
+ }
+
+ return string(out)
+}
+
+// InstanceStatusSubClient implements InstanceStatusSubManager
+type InstanceStatusSubClient struct {
+ storeName string
+ tagInst string
+}
+
+func NewInstanceStatusSubClient() *InstanceStatusSubClient {
+ return &InstanceStatusSubClient{
+ storeName: "rbdef",
+ tagInst: "instanceStatusSub",
+ }
+}
+
+type notifyResult struct {
+ result int32
+ time time.Time
+}
+
+type resourceStatusDelta struct {
+ Created []ResourceStatus `json:"created"`
+ Deleted []ResourceStatus `json:"deleted"`
+ Modified []ResourceStatus `json:"modified"`
+}
+
+type notifyRequestPayload struct {
+ InstanceId string `json:"instance-id"`
+ Subscription string `json:"subscription-name"`
+ Metadata map[string]interface{} `json:"metadata"`
+ Delta resourceStatusDelta `json:"resource-changes"`
+}
+
+func (rsd resourceStatusDelta) Delta() bool {
+ return len(rsd.Created) > 0 || len(rsd.Deleted) > 0 || len(rsd.Modified) > 0
+}
+
+type notifyChannelData struct {
+ instanceId string
+ subscription StatusSubscription
+ action string
+ delta resourceStatusDelta
+ notifyResult chan notifyResult
+}
+
+type subscriptionWatch struct {
+ watcherStop chan struct{}
+ lastUpdateTime time.Time
+}
+
+type subscriptionWatchManager struct {
+ watchersStatus map[string]subscriptionWatch
+}
+
+type subscriptionNotifyManager struct {
+ notifyLockMap map[string]*sync.Mutex
+ notifyChannel map[string]chan notifyChannelData
+ watchersStatus map[string]subscriptionWatchManager
+ sync.Mutex
+}
+
+var subscriptionNotifyData = subscriptionNotifyManager{
+ notifyLockMap: map[string]*sync.Mutex{},
+ notifyChannel: map[string]chan notifyChannelData{},
+ watchersStatus: map[string]subscriptionWatchManager{},
+}
+
+// InstanceStatusSubManager is an interface exposes the status subscription functionality
+type InstanceStatusSubManager interface {
+ Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+ Get(instanceId, subId string) (StatusSubscription, error)
+ Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+ List(instanceId string) ([]StatusSubscription, error)
+ Delete(instanceId, subId string) error
+ Cleanup(instanceId string) error
+ RestoreWatchers()
+}
+
+// Create Status Subscription
+func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+
+ _, err := iss.Get(instanceId, subDetails.Name)
+ if err == nil {
+ return StatusSubscription{}, pkgerrors.New("Subscription already exists")
+ }
+
+ lock, _, _ := getSubscriptionData(instanceId)
+
+ key := StatusSubscriptionKey{
+ InstanceId: instanceId,
+ SubscriptionName: subDetails.Name,
+ }
+
+ sub := StatusSubscription{
+ Name: subDetails.Name,
+ MinNotifyInterval: subDetails.MinNotifyInterval,
+ LastNotifyStatus: 0,
+ CallbackUrl: subDetails.CallbackUrl,
+ LastUpdateTime: time.Now(),
+ LastNotifyTime: time.Now(),
+ NotifyMetadata: subDetails.NotifyMetadata,
+ }
+ if sub.NotifyMetadata == nil {
+ sub.NotifyMetadata = make(map[string]interface{})
+ }
+
+ err = iss.refreshWatchers(instanceId, subDetails.Name)
+ if err != nil {
+ return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry")
+ }
+
+ lock.Lock()
+ defer lock.Unlock()
+
+ err = db.DBconn.Create(iss.storeName, key, iss.tagInst, sub)
+ if err != nil {
+ return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry")
+ }
+ log.Info("Successfully created Status Subscription", log.Fields{
+ "InstanceId": instanceId,
+ "SubscriptionName": subDetails.Name,
+ })
+
+ go runNotifyThread(instanceId, sub.Name)
+
+ return sub, nil
+}
+
+// Get Status subscription
+func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscription, error) {
+ lock, _, _ := getSubscriptionData(instanceId)
+ // Acquire Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ key := StatusSubscriptionKey{
+ InstanceId: instanceId,
+ SubscriptionName: subId,
+ }
+ DBResp, err := db.DBconn.Read(iss.storeName, key, iss.tagInst)
+ if err != nil || DBResp == nil {
+ return StatusSubscription{}, pkgerrors.Wrap(err, "Error retrieving Subscription data")
+ }
+
+ sub := StatusSubscription{}
+ err = db.DBconn.Unmarshal(DBResp, &sub)
+ if err != nil {
+ return StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling Subscription Value")
+ }
+ return sub, nil
+}
+
+// Update status subscription
+func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+ sub, err := iss.Get(instanceId, subDetails.Name)
+ if err != nil {
+ return StatusSubscription{}, pkgerrors.Wrap(err, "Subscription does not exist")
+ }
+
+ lock, _, _ := getSubscriptionData(instanceId)
+
+ key := StatusSubscriptionKey{
+ InstanceId: instanceId,
+ SubscriptionName: subDetails.Name,
+ }
+
+ sub.MinNotifyInterval = subDetails.MinNotifyInterval
+ sub.CallbackUrl = subDetails.CallbackUrl
+ sub.NotifyMetadata = subDetails.NotifyMetadata
+ if sub.NotifyMetadata == nil {
+ sub.NotifyMetadata = make(map[string]interface{})
+ }
+
+ err = iss.refreshWatchers(instanceId, subDetails.Name)
+ if err != nil {
+ return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry")
+ }
+
+ lock.Lock()
+ defer lock.Unlock()
+
+ err = db.DBconn.Update(iss.storeName, key, iss.tagInst, sub)
+ if err != nil {
+ return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry")
+ }
+ log.Info("Successfully updated Status Subscription", log.Fields{
+ "InstanceId": instanceId,
+ "SubscriptionName": subDetails.Name,
+ })
+
+ return sub, nil
+}
+
+// Get list of status subscriptions
+func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscription, error) {
+
+ lock, _, _ := getSubscriptionData(instanceId)
+ // Acquire Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ // Retrieve info about created status subscriptions
+ dbResp, err := db.DBconn.ReadAll(iss.storeName, iss.tagInst)
+ if err != nil {
+ if !strings.Contains(err.Error(), "Did not find any objects with tag") {
+ return []StatusSubscription{}, pkgerrors.Wrap(err, "Getting Status Subscription data")
+ }
+ }
+ subList := make([]StatusSubscription, 0)
+ for key, value := range dbResp {
+ if key != "" {
+ subKey := StatusSubscriptionKey{}
+ err = json.Unmarshal([]byte(key), &subKey)
+ if err != nil {
+ log.Error("Error demarshaling Status Subscription Key DB data", log.Fields{
+ "error": err.Error(),
+ "key": key})
+ return []StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling subscription key")
+ }
+ if subKey.InstanceId != instanceId {
+ continue
+ }
+ }
+ //value is a byte array
+ if value != nil {
+ sub := StatusSubscription{}
+ err = db.DBconn.Unmarshal(value, &sub)
+ if err != nil {
+ log.Error("Error demarshaling Status Subscription DB data", log.Fields{
+ "error": err.Error(),
+ "key": key})
+ }
+ subList = append(subList, sub)
+ }
+ }
+
+ return subList, nil
+}
+
+// Delete status subscription
+func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error {
+ _, err := iss.Get(instanceId, subId)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Subscription does not exist")
+ }
+ lock, _, watchers := getSubscriptionData(instanceId)
+ // Acquire Mutex
+ lock.Lock()
+ defer lock.Unlock()
+
+ close(watchers.watchersStatus[subId].watcherStop)
+ delete(watchers.watchersStatus, subId)
+
+ key := StatusSubscriptionKey{
+ InstanceId: instanceId,
+ SubscriptionName: subId,
+ }
+ err = db.DBconn.Delete(iss.storeName, key, iss.tagInst)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Removing Status Subscription in DB")
+ }
+ return nil
+}
+
+// Cleanup status subscriptions for instance
+func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error {
+ subList, err := iss.List(instanceId)
+ if err != nil {
+ return err
+ }
+
+ for _, sub := range subList {
+ err = iss.Delete(instanceId, sub.Name)
+ if err != nil {
+ log.Error("Error deleting ", log.Fields{
+ "error": err.Error(),
+ "key": sub.Name})
+ }
+ }
+ removeSubscriptionData(instanceId)
+ return err
+}
+
+// Restore status subscriptions notify threads
+func (iss *InstanceStatusSubClient) RestoreWatchers() {
+ go func() {
+ time.Sleep(time.Second * 10)
+ log.Info("Restoring status subscription notifications", log.Fields{})
+ v := NewInstanceClient()
+ instances, err := v.List("", "", "")
+ if err != nil {
+ log.Error("Error reading instance list", log.Fields{
+ "error": err.Error(),
+ })
+ }
+ for _, instance := range instances {
+ subList, err := iss.List(instance.ID)
+ if err != nil {
+ log.Error("Error reading subscription list for instance", log.Fields{
+ "error": err.Error(),
+ "instance": instance.ID,
+ })
+ continue
+ }
+
+ for _, sub := range subList {
+ err = iss.refreshWatchers(instance.ID, sub.Name)
+ if err != nil {
+ log.Error("Error on refreshing watchers", log.Fields{
+ "error": err.Error(),
+ "instance": instance.ID,
+ "subscription": sub.Name,
+ })
+ continue
+ }
+ go runNotifyThread(instance.ID, sub.Name)
+ }
+ }
+ }()
+}
+
+func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) error {
+ log.Info("REFRESH WATCHERS", log.Fields{
+ "instance": instanceId,
+ "subscription": subId,
+ })
+ v := NewInstanceClient()
+ k8sClient := KubernetesClient{}
+ instance, err := v.Get(instanceId)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Cannot get instance for notify thread")
+ }
+ profile, err := rb.NewProfileClient().Get(instance.Request.RBName, instance.Request.RBVersion,
+ instance.Request.ProfileName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Unable to find Profile instance status")
+ }
+ err = k8sClient.Init(instance.Request.CloudRegion, instanceId)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Cannot set k8s client for instance")
+ }
+
+ lock, _, watchers := getSubscriptionData(instanceId)
+ // Acquire Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ watcher, ok := watchers.watchersStatus[subId]
+ if ok {
+ close(watcher.watcherStop)
+ } else {
+ watchers.watchersStatus[subId] = subscriptionWatch{
+ lastUpdateTime: time.Now(),
+ }
+ }
+
+ watcher.watcherStop = make(chan struct{})
+
+ for _, gvk := range gvkListForInstance(instance, profile) {
+ informer, _ := k8sClient.GetInformer(gvk)
+ handlers := cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ lock.Lock()
+ watcher.lastUpdateTime = time.Now()
+ watchers.watchersStatus[subId] = watcher
+ lock.Unlock()
+ },
+ UpdateFunc: func(oldObj, obj interface{}) {
+ lock.Lock()
+ watcher.lastUpdateTime = time.Now()
+ watchers.watchersStatus[subId] = watcher
+ lock.Unlock()
+ },
+ DeleteFunc: func(obj interface{}) {
+ lock.Lock()
+ watcher.lastUpdateTime = time.Now()
+ watchers.watchersStatus[subId] = watcher
+ lock.Unlock()
+ },
+ }
+ informer.AddEventHandler(handlers)
+ go func(informer cache.SharedInformer, stopper chan struct{}, fields log.Fields) {
+ log.Info("[START] Watcher", fields)
+ informer.Run(stopper)
+ log.Info("[STOP] Watcher", fields)
+ }(informer, watcher.watcherStop, log.Fields{
+ "Kind": gvk.Kind,
+ "Instance": instanceId,
+ "Subscription": subId,
+ })
+ }
+ return nil
+}
+
+// Get the Mutex for the Subscription
+func getSubscriptionData(instanceId string) (*sync.Mutex, chan notifyChannelData, subscriptionWatchManager) {
+ var key string = instanceId
+ subscriptionNotifyData.Lock()
+ defer subscriptionNotifyData.Unlock()
+ _, ok := subscriptionNotifyData.notifyLockMap[key]
+ if !ok {
+ subscriptionNotifyData.notifyLockMap[key] = &sync.Mutex{}
+ }
+ _, ok = subscriptionNotifyData.notifyChannel[key]
+ if !ok {
+ subscriptionNotifyData.notifyChannel[key] = make(chan notifyChannelData)
+ go scheduleNotifications(instanceId, subscriptionNotifyData.notifyChannel[key])
+ time.Sleep(time.Second * 5)
+ }
+ _, ok = subscriptionNotifyData.watchersStatus[key]
+ if !ok {
+ subscriptionNotifyData.watchersStatus[key] = subscriptionWatchManager{
+ watchersStatus: make(map[string]subscriptionWatch),
+ }
+ }
+ return subscriptionNotifyData.notifyLockMap[key], subscriptionNotifyData.notifyChannel[key], subscriptionNotifyData.watchersStatus[key]
+}
+
+func removeSubscriptionData(instanceId string) {
+ var key string = instanceId
+ subscriptionNotifyData.Lock()
+ defer subscriptionNotifyData.Unlock()
+ _, ok := subscriptionNotifyData.notifyLockMap[key]
+ if ok {
+ delete(subscriptionNotifyData.notifyLockMap, key)
+ }
+ _, ok = subscriptionNotifyData.notifyChannel[key]
+ if ok {
+ crl := notifyChannelData{
+ instanceId: instanceId,
+ action: "STOP",
+ }
+ subscriptionNotifyData.notifyChannel[key] <- crl
+ delete(subscriptionNotifyData.notifyChannel, key)
+ }
+ _, ok = subscriptionNotifyData.watchersStatus[key]
+ if !ok {
+ delete(subscriptionNotifyData.watchersStatus, key)
+ }
+}
+
+// notify request timeout
+func notifyTimeout(network, addr string) (net.Conn, error) {
+ return net.DialTimeout(network, addr, time.Duration(time.Second*5))
+}
+
+// Per Subscription Go routine to send notification about status change
+func scheduleNotifications(instanceId string, c chan notifyChannelData) {
+ // Keep thread running
+ log.Info("[START] status notify thread for ", log.Fields{
+ "instance": instanceId,
+ })
+ for {
+ data := <-c
+ breakThread := false
+ switch {
+ case data.action == "NOTIFY":
+ var result = notifyResult{}
+ var err error = nil
+ var notifyPayload = notifyRequestPayload{
+ Delta: data.delta,
+ InstanceId: data.instanceId,
+ Subscription: data.subscription.Name,
+ Metadata: data.subscription.NotifyMetadata,
+ }
+ notifyBody, err := json.Marshal(notifyPayload)
+ if err == nil {
+ notifyBodyBuffer := bytes.NewBuffer(notifyBody)
+ transport := http.Transport{
+ Dial: notifyTimeout,
+ }
+ client := http.Client{
+ Transport: &transport,
+ }
+ resp, errReq := client.Post(data.subscription.CallbackUrl, "application/json", notifyBodyBuffer)
+ if errReq == nil {
+ result.result = int32(resp.StatusCode)
+ if resp.StatusCode >= 400 {
+ respBody, _ := ioutil.ReadAll(resp.Body)
+ log.Error("Status notification request failed", log.Fields{
+ "instance": instanceId,
+ "name": data.subscription.Name,
+ "url": data.subscription.CallbackUrl,
+ "code": resp.StatusCode,
+ "status": resp.Status,
+ "body": string(respBody),
+ })
+ resp.Body.Close()
+ }
+ } else {
+ err = errReq
+ }
+ }
+
+ if err != nil {
+ log.Error("Error for status notify thread", log.Fields{
+ "instance": instanceId,
+ "name": data.subscription.Name,
+ "err": err.Error(),
+ })
+ result.result = 500
+ }
+ result.time = time.Now()
+
+ data.notifyResult <- result
+
+ case data.action == "STOP":
+ breakThread = true
+ }
+ if breakThread {
+ break
+ }
+ }
+ log.Info("[STOP] status notify thread for ", log.Fields{
+ "instance": instanceId,
+ })
+}
+
+func gvkListForInstance(instance InstanceResponse, profile rb.Profile) []schema.GroupVersionKind {
+ list := make([]schema.GroupVersionKind, 0)
+ gvkMap := make(map[string]schema.GroupVersionKind)
+ gvk := schema.FromAPIVersionAndKind("v1", "Pod")
+ gvkMap[gvk.String()] = gvk
+ for _, res := range instance.Resources {
+ gvk = res.GVK
+ _, ok := gvkMap[gvk.String()]
+ if !ok {
+ gvkMap[gvk.String()] = gvk
+ }
+ }
+ for _, gvk := range profile.ExtraResourceTypes {
+ _, ok := gvkMap[gvk.String()]
+ if !ok {
+ gvkMap[gvk.String()] = gvk
+ }
+ }
+ for _, gvk := range gvkMap {
+ list = append(list, gvk)
+ }
+ return list
+}
+
+func runNotifyThread(instanceId, subName string) {
+ v := NewInstanceClient()
+ iss := NewInstanceStatusSubClient()
+ var status = InstanceStatus{
+ ResourceCount: -1,
+ }
+ key := StatusSubscriptionKey{
+ InstanceId: instanceId,
+ SubscriptionName: subName,
+ }
+ time.Sleep(time.Second * 5)
+ log.Info("[START] status verification thread", log.Fields{
+ "InstanceId": instanceId,
+ "SubscriptionName": subName,
+ })
+
+ lastChange := time.Now()
+ var timeInSeconds time.Duration = 5
+ for {
+ time.Sleep(time.Second * timeInSeconds)
+
+ lock, subData, watchers := getSubscriptionData(instanceId)
+ var changeDetected = false
+ lock.Lock()
+ watcherStatus, ok := watchers.watchersStatus[subName]
+ if ok {
+ changeDetected = watcherStatus.lastUpdateTime.After(lastChange)
+ }
+ lock.Unlock()
+ if !ok {
+ break
+ }
+ if changeDetected || status.ResourceCount < 0 {
+ currentSub, err := iss.Get(instanceId, subName)
+ if err != nil {
+ log.Error("Error getting current status", log.Fields{
+ "error": err.Error(),
+ "instance": instanceId})
+ break
+ }
+ if currentSub.MinNotifyInterval > 5 {
+ timeInSeconds = time.Duration(currentSub.MinNotifyInterval)
+ } else {
+ timeInSeconds = 5
+ }
+ newStatus, err := v.Status(instanceId, false)
+ if err != nil {
+ log.Error("Error getting current status", log.Fields{
+ "error": err.Error(),
+ "instance": instanceId})
+ break
+ } else {
+ if status.ResourceCount >= 0 {
+ var delta = statusDelta(status, newStatus)
+ if delta.Delta() {
+ log.Info("CHANGE DETECTED", log.Fields{
+ "Instance": instanceId,
+ "Subscription": subName,
+ })
+ lastChange = watcherStatus.lastUpdateTime
+ for _, res := range delta.Created {
+ log.Info("CREATED", log.Fields{
+ "Kind": res.GVK.Kind,
+ "Name": res.Name,
+ })
+ }
+ for _, res := range delta.Modified {
+ log.Info("MODIFIED", log.Fields{
+ "Kind": res.GVK.Kind,
+ "Name": res.Name,
+ })
+ }
+ for _, res := range delta.Deleted {
+ log.Info("DELETED", log.Fields{
+ "Kind": res.GVK.Kind,
+ "Name": res.Name,
+ })
+ }
+ // Acquire Mutex
+ lock.Lock()
+ currentSub.LastUpdateTime = time.Now()
+ var notifyResultCh = make(chan notifyResult)
+ var newData = notifyChannelData{
+ instanceId: instanceId,
+ subscription: currentSub,
+ action: "NOTIFY",
+ delta: delta,
+ notifyResult: notifyResultCh,
+ }
+ subData <- newData
+ var notifyResult notifyResult = <-notifyResultCh
+ log.Info("Notification sent", log.Fields{
+ "InstanceId": instanceId,
+ "SubscriptionName": subName,
+ "Result": notifyResult.result,
+ })
+ currentSub.LastNotifyStatus = notifyResult.result
+ currentSub.LastNotifyTime = notifyResult.time
+ err = db.DBconn.Update(iss.storeName, key, iss.tagInst, currentSub)
+ if err != nil {
+ log.Error("Error updating subscription status", log.Fields{
+ "error": err.Error(),
+ "instance": instanceId})
+ }
+ lock.Unlock()
+ }
+ }
+
+ status = newStatus
+ }
+ }
+ }
+ log.Info("[STOP] status verification thread", log.Fields{
+ "InstanceId": instanceId,
+ "SubscriptionName": subName,
+ })
+}
+
+func statusDelta(first, second InstanceStatus) resourceStatusDelta {
+ var delta resourceStatusDelta = resourceStatusDelta{
+ Created: make([]ResourceStatus, 0),
+ Deleted: make([]ResourceStatus, 0),
+ Modified: make([]ResourceStatus, 0),
+ }
+ var firstResList map[string]ResourceStatus = make(map[string]ResourceStatus)
+ for _, res := range first.ResourcesStatus {
+ firstResList[res.Key()] = res
+ }
+ for _, res := range second.ResourcesStatus {
+ var key string = res.Key()
+ if prevRes, ok := firstResList[key]; ok {
+ if prevRes.Value() != res.Value() {
+ delta.Modified = append(delta.Modified, res)
+ }
+ delete(firstResList, res.Key())
+ } else {
+ delta.Created = append(delta.Created, res)
+ }
+ }
+ for _, res := range firstResList {
+ delta.Deleted = append(delta.Deleted, res)
+ }
+ return delta
+}