aboutsummaryrefslogtreecommitdiffstats
path: root/services/api/api_rados.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/api/api_rados.py')
-rw-r--r--services/api/api_rados.py85
1 files changed, 32 insertions, 53 deletions
diff --git a/services/api/api_rados.py b/services/api/api_rados.py
index 61cfa5c..cdad7d4 100644
--- a/services/api/api_rados.py
+++ b/services/api/api_rados.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,7 +36,6 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
import time
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
@@ -50,6 +49,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class APIRados:
@staticmethod
@@ -75,77 +75,56 @@ class APIRados:
@staticmethod
def get_bucket_grants(bucket_name):
"""Return the Grants."""
- counter = 1
- bucket = APIRados.get_bucket(bucket_name)
- while not bucket and counter <= Constants.RGWAConstants.BUCKET_RETRIES_NUMBER:
- logger.error("Bucket not found. Retry #%s" % counter)
- time.sleep(session.wait_until_time_pause_long)
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
bucket = APIRados.get_bucket(bucket_name)
- counter += 1
- if not bucket:
+ if bucket:
+ break
+ logger.error("Bucket not found. Retry #%s" % counter+1)
+ time.sleep(session.wait_until_time_pause_long)
+ else:
raise TimeoutError("Max retries exceeded, failing test...")
grants = bucket.list_grants()
- print("***********grants=", grants)
return grants
@staticmethod
def is_bucket_ready(bucket_id):
- counter = 1
- bucket = APIRados.get_bucket(bucket_id)
- while (bucket == None and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- time.sleep(session.wait_until_time_pause_long)
- logger.debug(
- "bucket are not ready yet, trying again (%s of 180)" % counter)
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
bucket = APIRados.get_bucket(bucket_id)
- counter += 1
- print("****_+__+bucket= ", str(bucket))
- time.sleep(session.wait_until_time_pause_long)
- if bucket == None:
+ if bucket:
+ break
+ logger.debug(
+ "bucket are not ready yet, trying again (%s of %s)" % (
+ counter+1, Constants.RGWAConstants.BUCKET_RETRIES_NUMBER))
+ time.sleep(session.wait_until_time_pause_long)
+ else:
raise TimeoutError("Max retries exceeded, failing test...")
- elif bucket != None:
- logger.debug("bucket are ready to continue!")
- return True
+ logger.debug("bucket are ready to continue!")
+ return True
@staticmethod
def users_of_bucket_ready_after_complete(bucket_id, user_name):
- grants = APIRados.get_bucket_grants(bucket_id)
- count = 0
- counter = 1
- while (count != 0 and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
grants = APIRados.get_bucket_grants(bucket_id)
+ if not any(user_name == g.id for g in grants):
+ break
time.sleep(session.wait_until_time_pause_long)
- for g in grants:
- if g.id == user_name:
- count = +1
- time.sleep(session.wait_until_time_pause_long)
- if count != 0:
+ else:
raise Exception("Max retries exceeded, failing test...")
return False
- elif count == 0:
- logger.debug("users_of_bucket are ready to continue!")
- return True
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
@staticmethod
- def users_of_bucket_ready_after_created(bucket_id, user_name):
- grants = APIRados.get_bucket_grants(bucket_id)
- count = 0
- counter = 1
- while (count == 0 and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ def users_of_bucket_ready_after_created(bucket_id, user_uuid):
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
grants = APIRados.get_bucket_grants(bucket_id)
+ if any(user_uuid == g.id for g in grants):
+ break
time.sleep(session.wait_until_time_pause_long)
- for g in grants:
- if g.id == user_name:
- count = +1
- time.sleep(session.wait_until_time_pause_long)
- if count == 0:
+ else:
raise Exception("Max retries exceeded, failing test...")
- return False
- elif count > 0:
- logger.debug("users_of_bucket are ready to continue!")
- return True
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
@staticmethod
def specific_client(access_key_id, secret_access_key):