summaryrefslogtreecommitdiffstats
path: root/conductor
diff options
context:
space:
mode:
authorShankaranarayanan Puzhavakath Narayanan <snarayanan@research.att.com>2017-12-17 15:39:20 +0000
committerGerrit Code Review <gerrit@onap.org>2017-12-17 15:39:20 +0000
commit09dc0f0a13b898a5c59be0ddd83ea20654505a01 (patch)
treea4aa2cd82dc995d8f8e367ec422116e4f00a8f63 /conductor
parentb739f6148847c1dcf17b72215936b2394dee0ba1 (diff)
parentef05261dc4e54b91d024b4290463119a40b5efb7 (diff)
Merge "Added reservation directory to the repository"
Diffstat (limited to 'conductor')
-rw-r--r--conductor/conductor/reservation/__init__.py20
-rw-r--r--conductor/conductor/reservation/service.py370
2 files changed, 390 insertions, 0 deletions
diff --git a/conductor/conductor/reservation/__init__.py b/conductor/conductor/reservation/__init__.py
new file mode 100644
index 0000000..e615a9c
--- /dev/null
+++ b/conductor/conductor/reservation/__init__.py
@@ -0,0 +1,20 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+from .service import ReservationServiceLauncher # noqa: F401
diff --git a/conductor/conductor/reservation/service.py b/conductor/conductor/reservation/service.py
new file mode 100644
index 0000000..c2b0ba8
--- /dev/null
+++ b/conductor/conductor/reservation/service.py
@@ -0,0 +1,370 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import cotyledon
+from oslo_config import cfg
+from oslo_log import log
+
+from conductor.common.models import plan
+from conductor.common.music import api
+from conductor.common.music import messaging as music_messaging
+from conductor.common.music.model import base
+from conductor.i18n import _LE, _LI
+from conductor import messaging
+from conductor import service
+
+LOG = log.getLogger(__name__)
+
+CONF = cfg.CONF
+
+reservation_OPTS = [
+ cfg.IntOpt('workers',
+ default=1,
+ min=1,
+ help='Number of workers for reservation service. '
+ 'Default value is 1.'),
+ cfg.IntOpt('reserve_retries',
+ default=3,
+ help='Number of times reservation/release '
+ 'should be attempted.'),
+ cfg.IntOpt('reserve_counter',
+ default=3,
+ help='Number of times a plan should'
+ 'be attempted to reserve.'),
+ cfg.BoolOpt('concurrent',
+ default=False,
+ help='Set to True when reservation will run in active-active '
+ 'mode. When set to False, reservation will restart any '
+ 'orphaned reserving requests at startup.'),
+]
+
+CONF.register_opts(reservation_OPTS, group='reservation')
+
+# Pull in service opts. We use them here.
+OPTS = service.OPTS
+CONF.register_opts(OPTS)
+
+
+class ReservationServiceLauncher(object):
+ """Launcher for the reservation service."""
+
+ def __init__(self, conf):
+ self.conf = conf
+
+ # Set up Music access.
+ self.music = api.API()
+ self.music.keyspace_create(keyspace=conf.keyspace)
+
+ # Dynamically create a plan class for the specified keyspace
+ self.Plan = base.create_dynamic_model(
+ keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan")
+
+ if not self.Plan:
+ raise
+
+ def run(self):
+ kwargs = {'plan_class': self.Plan}
+ svcmgr = cotyledon.ServiceManager()
+ svcmgr.add(ReservationService,
+ workers=self.conf.reservation.workers,
+ args=(self.conf,), kwargs=kwargs)
+ svcmgr.run()
+
+
+class ReservationService(cotyledon.Service):
+ """reservation service."""
+
+ # This will appear in 'ps xaf'
+ name = "Conductor Reservation"
+
+ def __init__(self, worker_id, conf, **kwargs):
+ """Initializer"""
+ LOG.debug("%s" % self.__class__.__name__)
+ super(ReservationService, self).__init__(worker_id)
+ self._init(conf, **kwargs)
+ self.running = True
+
+ def _init(self, conf, **kwargs):
+ """Set up the necessary ingredients."""
+ self.conf = conf
+ self.kwargs = kwargs
+
+ self.Plan = kwargs.get('plan_class')
+
+ # Set up the RPC service(s) we want to talk to.
+ self.data_service = self.setup_rpc(conf, "data")
+
+ # Set up Music access.
+ self.music = api.API()
+
+ # Number of retries for reservation/release
+ self.reservation_retries = self.conf.reservation.reserve_retries
+ self.reservation_counter = self.conf.reservation.reserve_counter
+
+ if not self.conf.reservation.concurrent:
+ self._reset_reserving_status()
+
+ def _gracefully_stop(self):
+ """Gracefully stop working on things"""
+ pass
+
+ def _reset_reserving_status(self):
+ """Reset plans being reserved so they can be reserved again.
+
+ Use this only when the reservation service is not running concurrently.
+ """
+ plans = self.Plan.query.all()
+ for the_plan in plans:
+ if the_plan.status == self.Plan.RESERVING:
+ the_plan.status = self.Plan.SOLVED
+ the_plan.update()
+
+ def _restart(self):
+ """Prepare to restart the service"""
+ pass
+
+ def setup_rpc(self, conf, topic):
+ """Set up the RPC Client"""
+ # TODO(jdandrea): Put this pattern inside music_messaging?
+ transport = messaging.get_transport(conf=conf)
+ target = music_messaging.Target(topic=topic)
+ client = music_messaging.RPCClient(conf=conf,
+ transport=transport,
+ target=target)
+ return client
+
+ def try_reservation_call(self, method, candidate_list,
+ reservation_name, reservation_type,
+ controller, request):
+ # Call data service for reservation
+ # need to do this for self.reserve_retries times
+ ctxt = {}
+ args = {'method': method,
+ 'candidate_list': candidate_list,
+ 'reservation_name': reservation_name,
+ 'reservation_type': reservation_type,
+ 'controller': controller,
+ 'request': request
+ }
+
+ method_name = "call_reservation_operation"
+ attempt_count = 1
+ while attempt_count <= self.reservation_retries:
+ is_success = self.data_service.call(ctxt=ctxt,
+ method=method_name,
+ args=args)
+ LOG.debug("Attempt #{} calling method {} for candidate "
+ "{} - response: {}".format(attempt_count,
+ method,
+ candidate_list,
+ is_success))
+ if is_success:
+ return True
+ attempt_count += 1
+ return False
+
+ def rollback_reservation(self, reservation_list):
+ """Function to rollback(release) reservations"""
+ # TODO(snarayanan): Need to test this once the API is ready
+ for reservation in reservation_list:
+ candidate_list = reservation['candidate_list']
+ reservation_name = reservation['reservation_name']
+ reservation_type = reservation['reservation_type']
+ controller = reservation['controller']
+ request = reservation['request']
+
+ is_success = self.try_reservation_call(
+ method="release",
+ candidate_list=candidate_list,
+ reservation_name=reservation_name,
+ reservation_type=reservation_type,
+ controller=controller,
+ request=request
+ )
+ if not is_success:
+ # rollback failed report error to SDNC
+ message = _LE("Unable to release reservation "
+ "{}").format(reservation)
+ LOG.error(message)
+ return False
+ # move to the next reserved candidate
+ return True
+
+ def run(self):
+ """Run"""
+ LOG.debug("%s" % self.__class__.__name__)
+ # TODO(snarayanan): This is really meant to be a control loop
+ # As long as self.running is true, we process another request.
+ while self.running:
+ # plans = Plan.query().all()
+ # Find the first plan with a status of SOLVED.
+ # Change its status to RESERVING.
+
+ solution = None
+ translation = None
+ # requests_to_reserve = dict()
+ plans = self.Plan.query.all()
+ found_solved_template = False
+
+ for p in plans:
+ if p.status == self.Plan.SOLVED:
+ solution = p.solution
+ translation = p.translation
+ found_solved_template = True
+ break
+ if found_solved_template and not solution:
+ message = _LE("Plan {} status is solved, yet "
+ "the solution wasn't found").format(p.id)
+ LOG.error(message)
+ p.status = self.Plan.ERROR
+ p.message = message
+ p.update()
+ continue # continue looping
+ elif not solution:
+ continue # continue looping
+
+ # update status to reserving
+ p.status = self.Plan.RESERVING
+ p.update()
+
+ # begin reservations
+ # if plan needs reservation proceed with reservation
+ # else set status to done.
+ reservations = None
+ if translation:
+ conductor_solver = translation.get("conductor_solver")
+ if conductor_solver:
+ reservations = conductor_solver.get("reservations")
+ else:
+ LOG.error("no conductor_solver in "
+ "translation for Plan {}".format(p.id))
+
+ if reservations:
+ counter = reservations.get("counter") + 1
+ reservations['counter'] = counter
+ if counter <= self.reservation_counter:
+ recommendations = solution.get("recommendations")
+ reservation_list = list()
+
+ for reservation, resource in reservations.get("demands",
+ {}).items():
+ candidates = list()
+ reservation_demand = resource.get("demand")
+ reservation_name = resource.get("name")
+ reservation_type = resource.get("type")
+
+ reservation_properties = resource.get("properties")
+ if reservation_properties:
+ controller = reservation_properties.get(
+ "controller")
+ request = reservation_properties.get("request")
+
+ for recommendation in recommendations:
+ for demand, r_resource in recommendation.items():
+ if demand == reservation_demand:
+ # get selected candidate from translation
+ selected_candidate_id = \
+ r_resource.get("candidate")\
+ .get("candidate_id")
+ demands = \
+ translation.get("conductor_solver")\
+ .get("demands")
+ for demand_name, d_resource in \
+ demands.items():
+ if demand_name == demand:
+ for candidate in d_resource\
+ .get("candidates"):
+ if candidate\
+ .get("candidate_id") ==\
+ selected_candidate_id:
+ candidates\
+ .append(candidate)
+
+ is_success = self.try_reservation_call(
+ method="reserve",
+ candidate_list=candidates,
+ reservation_name=reservation_name,
+ reservation_type=reservation_type,
+ controller=controller,
+ request=request)
+
+ # if reservation succeeds continue with next candidate
+ if is_success:
+ curr_reservation = dict()
+ curr_reservation['candidate_list'] = candidates
+ curr_reservation['reservation_name'] = \
+ reservation_name
+ curr_reservation['reservation_type'] = \
+ reservation_type
+ curr_reservation['controller'] = controller
+ curr_reservation['request'] = request
+ reservation_list.append(curr_reservation)
+ else:
+ # begin roll back of all reserved resources on
+ # the first failed reservation
+ rollback_status = \
+ self.rollback_reservation(reservation_list)
+ # statuses
+ if rollback_status:
+ # released all reservations,
+ # move plan to translated
+ p.status = self.Plan.TRANSLATED
+ p.update()
+ del reservation_list[:]
+ else:
+ LOG.error("Reservation rollback failed")
+ p.status = self.Plan.ERROR
+ p.message = "Reservation release failed"
+ p.update()
+ break # reservation failed
+
+ continue
+ # continue with reserving the next candidate
+ else:
+ LOG.error("Tried {} times. Plan {} is unable to make"
+ "reservation "
+ .format(self.reservation_counter, p.id))
+ p.status = self.Plan.ERROR
+ p.message = "Reservation failed"
+ p.update()
+ continue
+
+ # verify if all candidates have been reserved
+ if p.status == self.Plan.RESERVING:
+ # all reservations succeeded.
+ LOG.info(_LI("Plan {} Reservation complete").
+ format(p.id))
+ LOG.debug("Plan {} Reservation complete".format(p.id))
+ p.status = self.Plan.DONE
+ p.update()
+
+ continue
+ # done reserving continue to loop
+
+ def terminate(self):
+ """Terminate"""
+ LOG.debug("%s" % self.__class__.__name__)
+ self.running = False
+ self._gracefully_stop()
+ super(ReservationService, self).terminate()
+
+ def reload(self):
+ """Reload"""
+ LOG.debug("%s" % self.__class__.__name__)
+ self._restart()