diff options
Diffstat (limited to 'src/orchestrator/pkg/infra/db/mongo.go')
-rw-r--r-- | src/orchestrator/pkg/infra/db/mongo.go | 238 |
1 files changed, 202 insertions, 36 deletions
diff --git a/src/orchestrator/pkg/infra/db/mongo.go b/src/orchestrator/pkg/infra/db/mongo.go index 32d0b549..30eb899f 100644 --- a/src/orchestrator/pkg/infra/db/mongo.go +++ b/src/orchestrator/pkg/infra/db/mongo.go @@ -17,7 +17,9 @@ package db import ( + "encoding/json" "log" + "sort" "golang.org/x/net/context" @@ -41,8 +43,12 @@ type MongoCollection interface { update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) + DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) + UpdateOne(ctx context.Context, filter interface{}, update interface{}, + opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) } // MongoStore is an implementation of the db.Store interface @@ -339,58 +345,218 @@ func (m *MongoStore) Delete(coll string, key Key, tag string) error { return nil } -// ReadAll is used to get all documents in db of a particular tag -func (m *MongoStore) ReadAll(coll, tag string) (map[string][]byte, error) { - if !m.validateParams(coll, tag) { - return nil, pkgerrors.New("Missing collection or tag name") +func (m *MongoStore) findFilter(key Key) (primitive.M, error) { + + var bsonMap bson.M + st, err := json.Marshal(key) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Marshalling key: %s", err.Error()) + } + err = json.Unmarshal([]byte(st), &bsonMap) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Unmarshalling key to Bson Map: %s", err.Error()) + } + filter := bson.M{ + "$and": []bson.M{bsonMap}, + } + return filter, nil +} + +func (m *MongoStore) findFilterWithKey(key Key) (primitive.M, error) { + + var bsonMap bson.M + var bsonMapFinal bson.M + st, err := json.Marshal(key) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Marshalling key: %s", err.Error()) + } + err = json.Unmarshal([]byte(st), &bsonMap) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Unmarshalling key to Bson Map: %s", err.Error()) + } + bsonMapFinal = make(bson.M) + for k, v := range bsonMap { + if v == "" { + if _, ok := bsonMapFinal["key"]; !ok { + // add type of key to filter + s, err := m.createKeyField(key) + if err != nil { + return primitive.M{}, err + } + bsonMapFinal["key"] = s + } + } else { + bsonMapFinal[k] = v + } + } + filter := bson.M{ + "$and": []bson.M{bsonMapFinal}, + } + return filter, nil +} + +func (m *MongoStore) updateFilter(key interface{}) (primitive.M, error) { + + var n map[string]string + st, err := json.Marshal(key) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Marshalling key: %s", err.Error()) + } + err = json.Unmarshal([]byte(st), &n) + if err != nil { + return primitive.M{}, pkgerrors.Errorf("Error Unmarshalling key to Bson Map: %s", err.Error()) + } + p := make(bson.M, len(n)) + for k, v := range n { + p[k] = v + } + filter := bson.M{ + "$set": p, + } + return filter, nil +} + +func (m *MongoStore) createKeyField(key interface{}) (string, error) { + + var n map[string]string + st, err := json.Marshal(key) + if err != nil { + return "", pkgerrors.Errorf("Error Marshalling key: %s", err.Error()) + } + err = json.Unmarshal([]byte(st), &n) + if err != nil { + return "", pkgerrors.Errorf("Error Unmarshalling key to Bson Map: %s", err.Error()) + } + var keys []string + for k := range n { + keys = append(keys, k) + } + sort.Strings(keys) + s := "{" + for _, k := range keys { + s = s + k + "," + } + s = s + "}" + return s, nil +} + +// Insert is used to insert/add element to a document +func (m *MongoStore) Insert(coll string, key Key, query interface{}, tag string, data interface{}) error { + if data == nil || !m.validateParams(coll, key, tag) { + return pkgerrors.New("No Data to store") } c := getCollection(coll, m) ctx := context.Background() - //Get all master tables in this collection - filter := bson.D{ - {"key", bson.D{ - {"$exists", true}, - }}, + filter, err := m.findFilter(key) + if err != nil { + return err } - cursor, err := c.Find(ctx, filter) + // Create and add key tag + s, err := m.createKeyField(key) if err != nil { - return nil, pkgerrors.Errorf("Error reading from database: %s", err.Error()) + return err } - defer cursorClose(ctx, cursor) + _, err = decodeBytes( + c.FindOneAndUpdate( + ctx, + filter, + bson.D{ + {"$set", bson.D{ + {tag, data}, + {"key", s}, + }}, + }, + options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After))) - //Iterate over all the master tables - result := make(map[string][]byte) - for cursorNext(ctx, cursor) { - d := cursor.Current + if err != nil { + return pkgerrors.Errorf("Error updating master table: %s", err.Error()) + } + if query == nil { + return nil + } - //Read key of each master table - key, ok := d.Lookup("key").DocumentOK() - if !ok { - //Throw error if key is not found - pkgerrors.New("Unable to read key from mastertable") - } + // Update to add Query fields + update, err := m.updateFilter(query) + if err != nil { + return err + } + _, err = c.UpdateOne( + ctx, + filter, + update) - //Get objectID of tag document - tid, ok := d.Lookup(tag).ObjectIDOK() - if !ok { - log.Printf("Did not find tag: %s", tag) - continue - } + if err != nil { + return pkgerrors.Errorf("Error updating Query fields: %s", err.Error()) + } + return nil +} - //Find tag document and unmarshal it into []byte - tagData, err := decodeBytes(c.FindOne(ctx, bson.D{{"_id", tid}})) - if err != nil { - log.Printf("Unable to decode tag data %s", err.Error()) - continue - } - result[key.String()] = tagData.Lookup(tag).Value +// Find method returns the data stored for this key and for this particular tag +func (m *MongoStore) Find(coll string, key Key, tag string) ([][]byte, error) { + + //result, err := m.findInternal(coll, key, tag, "") + //return result, err + if !m.validateParams(coll, key, tag) { + return nil, pkgerrors.New("Mandatory fields are missing") } + c := getCollection(coll, m) + ctx := context.Background() + + filter, err := m.findFilterWithKey(key) + if err != nil { + return nil, err + } + // Find only the field requested + projection := bson.D{ + {tag, 1}, + {"_id", 0}, + } + + cursor, err := c.Find(context.Background(), filter, options.Find().SetProjection(projection)) + if err != nil { + return nil, pkgerrors.Errorf("Error finding element: %s", err.Error()) + } + defer cursorClose(ctx, cursor) + var data []byte + var result [][]byte + for cursorNext(ctx, cursor) { + d := cursor.Current + switch d.Lookup(tag).Type { + case bson.TypeString: + data = []byte(d.Lookup(tag).StringValue()) + default: + r, err := d.LookupErr(tag) + if err != nil { + // Throw error if not found + pkgerrors.New("Unable to read data ") + } + data = r.Value + } + result = append(result, data) + } if len(result) == 0 { return result, pkgerrors.Errorf("Did not find any objects with tag: %s", tag) } - return result, nil } + +// Remove method to remove the documet by key +func (m *MongoStore) Remove(coll string, key Key) error { + if !m.validateParams(coll, key) { + return pkgerrors.New("Mandatory fields are missing") + } + c := getCollection(coll, m) + ctx := context.Background() + filter, err := m.findFilterWithKey(key) + if err != nil { + return err + } + _, err = c.DeleteMany(ctx, filter) + if err != nil { + return pkgerrors.Errorf("Error Deleting from database: %s", err.Error()) + } + return nil +} |