summaryrefslogtreecommitdiffstats
path: root/src/k8splugin/db/mongo.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/k8splugin/db/mongo.go')
-rw-r--r--src/k8splugin/db/mongo.go323
1 files changed, 323 insertions, 0 deletions
diff --git a/src/k8splugin/db/mongo.go b/src/k8splugin/db/mongo.go
new file mode 100644
index 00000000..311f044c
--- /dev/null
+++ b/src/k8splugin/db/mongo.go
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2018 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package db
+
+import (
+ "github.com/mongodb/mongo-go-driver/bson"
+ "github.com/mongodb/mongo-go-driver/bson/primitive"
+ "github.com/mongodb/mongo-go-driver/mongo"
+ "github.com/mongodb/mongo-go-driver/mongo/options"
+ pkgerrors "github.com/pkg/errors"
+ "golang.org/x/net/context"
+ "log"
+ "os"
+)
+
+// MongoCollection defines the a subset of MongoDB operations
+// Note: This interface is defined mainly for mock testing
+type MongoCollection interface {
+ InsertOne(ctx context.Context, document interface{},
+ opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
+ FindOne(ctx context.Context, filter interface{},
+ opts ...*options.FindOneOptions) *mongo.SingleResult
+ FindOneAndUpdate(ctx context.Context, filter interface{},
+ update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
+ DeleteOne(ctx context.Context, filter interface{},
+ opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
+ Find(ctx context.Context, filter interface{},
+ opts ...*options.FindOptions) (mongo.Cursor, error)
+}
+
+// MongoStore is an implementation of the db.Store interface
+type MongoStore struct {
+ db *mongo.Database
+}
+
+// This exists only for allowing us to mock the collection object
+// for testing purposes
+var getCollection = func(coll string, m *MongoStore) MongoCollection {
+ return m.db.Collection(coll)
+}
+
+// This exists only for allowing us to mock the DecodeBytes function
+// Mainly because we cannot construct a SingleResult struct from our
+// tests. All fields in that struct are private.
+var decodeBytes = func(sr *mongo.SingleResult) (bson.Raw, error) {
+ return sr.DecodeBytes()
+}
+
+// NewMongoStore initializes a Mongo Database with the name provided
+// If a database with that name exists, it will be returned
+func NewMongoStore(name string, store *mongo.Database) (Store, error) {
+ if store == nil {
+ ip := "mongodb://" + os.Getenv("DATABASE_IP") + ":27017"
+ mongoClient, err := mongo.NewClient(ip)
+ if err != nil {
+ return nil, err
+ }
+
+ err = mongoClient.Connect(context.Background())
+ if err != nil {
+ return nil, err
+ }
+ store = mongoClient.Database(name)
+ }
+
+ return &MongoStore{
+ db: store,
+ }, nil
+}
+
+// HealthCheck verifies if the database is up and running
+func (m *MongoStore) HealthCheck() error {
+
+ _, err := decodeBytes(m.db.RunCommand(context.Background(), bson.D{{"serverStatus", 1}}))
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error getting server status")
+ }
+
+ return nil
+}
+
+// validateParams checks to see if any parameters are empty
+func (m *MongoStore) validateParams(args ...string) bool {
+ for _, v := range args {
+ if v == "" {
+ return false
+ }
+ }
+
+ return true
+}
+
+// Create is used to create a DB entry
+func (m *MongoStore) Create(coll, key, tag string, data interface{}) error {
+ if data == nil || !m.validateParams(coll, key, tag) {
+ return pkgerrors.New("No Data to store")
+ }
+
+ c := getCollection(coll, m)
+ ctx := context.Background()
+
+ //Insert the data and then add the objectID to the masterTable
+ res, err := c.InsertOne(ctx, bson.D{
+ {tag, data},
+ })
+ if err != nil {
+ return pkgerrors.Errorf("Error inserting into database: %s", err.Error())
+ }
+
+ //Add objectID of created data to masterKey document
+ //Create masterkey document if it does not exist
+ filter := bson.D{{"key", key}}
+
+ _, err = decodeBytes(
+ c.FindOneAndUpdate(
+ ctx,
+ filter,
+ bson.D{
+ {"$set", bson.D{
+ {tag, res.InsertedID},
+ }},
+ },
+ options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)))
+
+ if err != nil {
+ return pkgerrors.Errorf("Error updating master table: %s", err.Error())
+ }
+
+ return nil
+}
+
+// Unmarshal implements an unmarshaler for bson data that
+// is produced from the mongo database
+func (m *MongoStore) Unmarshal(inp []byte, out interface{}) error {
+ err := bson.Unmarshal(inp, out)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Unmarshaling bson")
+ }
+ return nil
+}
+
+// Read method returns the data stored for this key and for this particular tag
+func (m *MongoStore) Read(coll, key, tag string) ([]byte, error) {
+ if !m.validateParams(coll, key, tag) {
+ return nil, pkgerrors.New("Mandatory fields are missing")
+ }
+
+ c := getCollection(coll, m)
+ ctx := context.Background()
+
+ //Get the masterkey document based on given key
+ filter := bson.D{{"key", key}}
+ keydata, err := decodeBytes(c.FindOne(context.Background(), filter))
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error finding master table: %s", err.Error())
+ }
+
+ //Read the tag objectID from document
+ tagoid, ok := keydata.Lookup(tag).ObjectIDOK()
+ if !ok {
+ return nil, pkgerrors.Errorf("Error finding objectID for tag %s", tag)
+ }
+
+ //Use tag objectID to read the data from store
+ filter = bson.D{{"_id", tagoid}}
+ tagdata, err := decodeBytes(c.FindOne(ctx, filter))
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error reading found object: %s", err.Error())
+ }
+
+ //Return the data as a byte array
+ return tagdata.Lookup(tag).Value, nil
+}
+
+// Helper function that deletes an object by its ID
+func (m *MongoStore) deleteObjectByID(coll string, objID primitive.ObjectID) error {
+
+ c := getCollection(coll, m)
+ ctx := context.Background()
+
+ _, err := c.DeleteOne(ctx, bson.D{{"_id", objID}})
+ if err != nil {
+ return pkgerrors.Errorf("Error Deleting from database: %s", err.Error())
+ }
+
+ log.Printf("Deleted Obj with ID %s", objID.String())
+ return nil
+}
+
+// Delete method removes a document from the Database that matches key
+// TODO: delete all referenced docs if tag is empty string
+func (m *MongoStore) Delete(coll, key, tag string) error {
+ if !m.validateParams(coll, key, tag) {
+ return pkgerrors.New("Mandatory fields are missing")
+ }
+
+ c := getCollection(coll, m)
+ ctx := context.Background()
+
+ //Get the masterkey document based on given key
+ filter := bson.D{{"key", key}}
+ //Remove the tag ID entry from masterkey table
+ update := bson.D{
+ {
+ "$unset", bson.D{
+ {tag, ""},
+ },
+ },
+ }
+ keydata, err := decodeBytes(c.FindOneAndUpdate(ctx, filter, update,
+ options.FindOneAndUpdate().SetReturnDocument(options.Before)))
+ if err != nil {
+ return pkgerrors.Errorf("Error decoding master table after update: %s",
+ err.Error())
+ }
+
+ //Read the tag objectID from document
+ elems, err := keydata.Elements()
+ if err != nil {
+ return pkgerrors.Errorf("Error reading elements from database: %s", err.Error())
+ }
+
+ tagoid, ok := keydata.Lookup(tag).ObjectIDOK()
+ if !ok {
+ return pkgerrors.Errorf("Error finding objectID for tag %s", tag)
+ }
+
+ //Use tag objectID to read the data from store
+ err = m.deleteObjectByID(coll, tagoid)
+ if err != nil {
+ return pkgerrors.Errorf("Error deleting from database: %s", err.Error())
+ }
+
+ //Delete master table if no more tags left
+ //_id, key and tag should be elements in before doc
+ //if master table needs to be removed too
+ if len(elems) == 3 {
+ keyid, ok := keydata.Lookup("_id").ObjectIDOK()
+ if !ok {
+ return pkgerrors.Errorf("Error finding objectID for key %s", key)
+ }
+ err = m.deleteObjectByID(coll, keyid)
+ if err != nil {
+ return pkgerrors.Errorf("Error deleting master table from database: %s", err.Error())
+ }
+ }
+
+ return nil
+}
+
+// ReadAll is used to get all documents in db of a particular tag
+func (m *MongoStore) ReadAll(coll, tag string) (map[string][]byte, error) {
+ if !m.validateParams(coll, tag) {
+ return nil, pkgerrors.New("Missing collection or tag name")
+ }
+
+ c := getCollection(coll, m)
+ ctx := context.Background()
+
+ //Get all master tables in this collection
+ filter := bson.D{
+ {"key", bson.D{
+ {"$exists", true},
+ }},
+ }
+ cursor, err := c.Find(ctx, filter)
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error reading from database: %s", err.Error())
+ }
+ defer cursor.Close(ctx)
+
+ //Iterate over all the master tables
+ result := make(map[string][]byte)
+ for cursor.Next(ctx) {
+ d, err := cursor.DecodeBytes()
+ if err != nil {
+ log.Printf("Unable to decode data in Readall: %s", err.Error())
+ continue
+ }
+
+ //Read key of each master table
+ key, ok := d.Lookup("key").StringValueOK()
+ if !ok {
+ log.Printf("Unable to read key string from mastertable %s", err.Error())
+ continue
+ }
+
+ //Get objectID of tag document
+ tid, ok := d.Lookup(tag).ObjectIDOK()
+ if !ok {
+ log.Printf("Did not find tag: %s", tag)
+ continue
+ }
+
+ //Find tag document and unmarshal it into []byte
+ tagData, err := decodeBytes(c.FindOne(ctx, bson.D{{"_id", tid}}))
+ if err != nil {
+ log.Printf("Unable to decode tag data %s", err.Error())
+ continue
+ }
+ result[key] = tagData.Lookup(tag).Value
+ }
+
+ if len(result) == 0 {
+ return result, pkgerrors.Errorf("Did not find any objects with tag: %s", tag)
+ }
+
+ return result, nil
+}