summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py364
1 files changed, 364 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py b/azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py
new file mode 100644
index 0000000..6bdb5b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/extensions/aria_extension_tosca/simple_v1_0/modeling/requirements.py
@@ -0,0 +1,364 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria.parser.validation import Issue
+from aria.utils.collections import (deepcopy_with_locators, OrderedDict)
+
+from .parameters import (convert_parameter_definitions_to_values, validate_required_values,
+ coerce_parameter_value)
+from .interfaces import (convert_requirement_interface_definitions_from_type_to_raw_template,
+ merge_interface_definitions, merge_interface, validate_required_inputs)
+
+
+#
+# NodeType
+#
+
+def get_inherited_requirement_definitions(context, presentation):
+ """
+ Returns our requirement definitions added on top of those of our parent, if we have one
+ (recursively).
+
+ Allows overriding requirement definitions if they have the same name.
+ """
+
+ parent = presentation._get_parent(context)
+ requirement_definitions = get_inherited_requirement_definitions(context, parent) \
+ if parent is not None else []
+
+ our_requirement_definitions = presentation.requirements
+ if our_requirement_definitions:
+ for requirement_name, our_requirement_definition in our_requirement_definitions:
+ # Remove existing requirement definitions of this name if they exist
+ for name, requirement_definition in requirement_definitions:
+ if name == requirement_name:
+ requirement_definitions.remove((name, requirement_definition))
+
+ requirement_definitions.append((requirement_name, our_requirement_definition))
+
+ return requirement_definitions
+
+
+#
+# NodeTemplate
+#
+
+def get_template_requirements(context, presentation):
+ """
+ Returns our requirements added on top of those of the node type if they exist there.
+
+ If the requirement has a relationship, the relationship properties and interfaces are assigned.
+
+ Returns the assigned property, interface input, and interface operation input values while
+ making sure they are defined in our type. Default values, if available, will be used if we did
+ not assign them. Also makes sure that required properties and inputs indeed end up with a value.
+ """
+
+ requirement_assignments = []
+
+ the_type = presentation._get_type(context) # NodeType
+ requirement_definitions = the_type._get_requirements(context) if the_type is not None else None
+
+ # Add our requirement assignments
+ our_requirement_assignments = presentation.requirements
+ if our_requirement_assignments:
+ add_requirement_assignments(context, presentation, requirement_assignments,
+ requirement_definitions, our_requirement_assignments)
+
+ # Validate occurrences
+ if requirement_definitions:
+ for requirement_name, requirement_definition in requirement_definitions:
+ # Allowed occurrences
+ allowed_occurrences = requirement_definition.occurrences
+ allowed_occurrences = allowed_occurrences if allowed_occurrences is not None else None
+
+ # Count actual occurrences
+ actual_occurrences = 0
+ for name, _ in requirement_assignments:
+ if name == requirement_name:
+ actual_occurrences += 1
+
+ if allowed_occurrences is None:
+ # If not specified, we interpret this to mean that exactly 1 occurrence is required
+ if actual_occurrences == 0:
+ # If it's not there, we will automatically add it (this behavior is not in the
+ # TOSCA spec, but seems implied)
+ requirement_assignment, \
+ relationship_property_definitions, \
+ relationship_interface_definitions = \
+ convert_requirement_from_definition_to_assignment(context,
+ requirement_definition,
+ None, presentation)
+ validate_requirement_assignment(context, presentation, requirement_assignment,
+ relationship_property_definitions,
+ relationship_interface_definitions)
+ requirement_assignments.append((requirement_name, requirement_assignment))
+ elif actual_occurrences > 1:
+ context.validation.report(
+ 'requirement "%s" is allowed only one occurrence in "%s": %d'
+ % (requirement_name, presentation._fullname, actual_occurrences),
+ locator=presentation._locator, level=Issue.BETWEEN_TYPES)
+ else:
+ if not allowed_occurrences.is_in(actual_occurrences):
+ if allowed_occurrences.value[1] == 'UNBOUNDED':
+ context.validation.report(
+ 'requirement "%s" does not have at least %d occurrences in "%s": has %d'
+ % (requirement_name, allowed_occurrences.value[0],
+ presentation._fullname, actual_occurrences),
+ locator=presentation._locator, level=Issue.BETWEEN_TYPES)
+ else:
+ context.validation.report(
+ 'requirement "%s" is allowed between %d and %d occurrences in "%s":'
+ ' has %d'
+ % (requirement_name, allowed_occurrences.value[0],
+ allowed_occurrences.value[1], presentation._fullname,
+ actual_occurrences),
+ locator=presentation._locator, level=Issue.BETWEEN_TYPES)
+
+ return requirement_assignments
+
+
+#
+# Utils
+#
+
+def convert_requirement_from_definition_to_assignment(context, requirement_definition, # pylint: disable=too-many-branches
+ our_requirement_assignment, container):
+ from ..assignments import RequirementAssignment
+
+ raw = OrderedDict()
+
+ # Capability type name:
+ raw['capability'] = deepcopy_with_locators(requirement_definition.capability)
+
+ node_type = requirement_definition._get_node_type(context)
+ if node_type is not None:
+ raw['node'] = deepcopy_with_locators(node_type._name)
+
+ relationship_type = None
+ relationship_template = None
+ relationship_property_definitions = None
+ relationship_interface_definitions = None
+
+ # First try to find the relationship if we declared it
+ # RelationshipAssignment:
+ our_relationship = our_requirement_assignment.relationship \
+ if our_requirement_assignment is not None else None
+ if our_relationship is not None:
+ relationship_type, relationship_type_variant = our_relationship._get_type(context)
+ if relationship_type_variant == 'relationship_template':
+ relationship_template = relationship_type
+ relationship_type = relationship_template._get_type(context)
+
+ definition_relationship_type = None
+ relationship_definition = requirement_definition.relationship # RelationshipDefinition
+ if relationship_definition is not None:
+ definition_relationship_type = relationship_definition._get_type(context)
+
+ # If not exists, try at the node type
+ if relationship_type is None:
+ relationship_type = definition_relationship_type
+ else:
+ # Make sure the type is derived
+ if not definition_relationship_type._is_descendant(context, relationship_type):
+ context.validation.report(
+ 'assigned relationship type "%s" is not a descendant of declared relationship type'
+ ' "%s"' \
+ % (relationship_type._name, definition_relationship_type._name),
+ locator=container._locator, level=Issue.BETWEEN_TYPES)
+
+ if relationship_type is not None:
+ raw['relationship'] = OrderedDict()
+
+ type_name = our_relationship.type if our_relationship is not None else None
+ if type_name is None:
+ type_name = relationship_type._name
+
+ raw['relationship']['type'] = deepcopy_with_locators(type_name)
+
+ # These are our property definitions
+ relationship_property_definitions = relationship_type._get_properties(context)
+
+ if relationship_template is not None:
+ # Property values from template
+ raw['relationship']['properties'] = relationship_template._get_property_values(context)
+ else:
+ if relationship_property_definitions:
+ # Convert property definitions to values
+ raw['relationship']['properties'] = \
+ convert_parameter_definitions_to_values(context,
+ relationship_property_definitions)
+
+ # These are our interface definitions
+ # InterfaceDefinition:
+ relationship_interface_definitions = OrderedDict(relationship_type._get_interfaces(context))
+
+ # Convert interface definitions to templates
+ convert_requirement_interface_definitions_from_type_to_raw_template(
+ context,
+ raw['relationship'],
+ relationship_interface_definitions)
+
+ if relationship_definition:
+ # Merge extra interface definitions
+ # InterfaceDefinition:
+ definition_interface_definitions = relationship_definition.interfaces
+ merge_interface_definitions(context, relationship_interface_definitions,
+ definition_interface_definitions, requirement_definition,
+ container)
+
+ if relationship_template is not None:
+ # Interfaces from template
+ interfaces = relationship_template._get_interfaces(context)
+ if interfaces:
+ raw['relationship']['interfaces'] = OrderedDict()
+ for interface_name, interface in interfaces.iteritems():
+ raw['relationship']['interfaces'][interface_name] = interface._raw
+
+ return \
+ RequirementAssignment(name=requirement_definition._name, raw=raw, container=container), \
+ relationship_property_definitions, \
+ relationship_interface_definitions
+
+
+def add_requirement_assignments(context, presentation, requirement_assignments,
+ requirement_definitions, our_requirement_assignments):
+ for requirement_name, our_requirement_assignment in our_requirement_assignments:
+ requirement_definition = get_first_requirement(requirement_definitions, requirement_name)
+ if requirement_definition is not None:
+ requirement_assignment, \
+ relationship_property_definitions, \
+ relationship_interface_definitions = \
+ convert_requirement_from_definition_to_assignment(context, requirement_definition,
+ our_requirement_assignment,
+ presentation)
+ merge_requirement_assignment(context,
+ relationship_property_definitions,
+ relationship_interface_definitions,
+ requirement_assignment, our_requirement_assignment)
+ validate_requirement_assignment(context,
+ our_requirement_assignment.relationship \
+ or our_requirement_assignment,
+ requirement_assignment,
+ relationship_property_definitions,
+ relationship_interface_definitions)
+ requirement_assignments.append((requirement_name, requirement_assignment))
+ else:
+ context.validation.report('requirement "%s" not declared at node type "%s" in "%s"'
+ % (requirement_name, presentation.type,
+ presentation._fullname),
+ locator=our_requirement_assignment._locator,
+ level=Issue.BETWEEN_TYPES)
+
+
+def merge_requirement_assignment(context, relationship_property_definitions,
+ relationship_interface_definitions, requirement, our_requirement):
+ our_capability = our_requirement.capability
+ if our_capability is not None:
+ requirement._raw['capability'] = deepcopy_with_locators(our_capability)
+
+ our_node = our_requirement.node
+ if our_node is not None:
+ requirement._raw['node'] = deepcopy_with_locators(our_node)
+
+ our_node_filter = our_requirement.node_filter
+ if our_node_filter is not None:
+ requirement._raw['node_filter'] = deepcopy_with_locators(our_node_filter._raw)
+
+ our_relationship = our_requirement.relationship # RelationshipAssignment
+ if (our_relationship is not None) and (our_relationship.type is None):
+ # Make sure we have a dict
+ if 'relationship' not in requirement._raw:
+ requirement._raw['relationship'] = OrderedDict()
+
+ merge_requirement_assignment_relationship(context, our_relationship,
+ relationship_property_definitions,
+ relationship_interface_definitions,
+ requirement, our_relationship)
+
+
+def merge_requirement_assignment_relationship(context, presentation, property_definitions,
+ interface_definitions, requirement, our_relationship):
+ our_relationship_properties = our_relationship._raw.get('properties')
+ if our_relationship_properties:
+ # Make sure we have a dict
+ if 'properties' not in requirement._raw['relationship']:
+ requirement._raw['relationship']['properties'] = OrderedDict()
+
+ # Merge our properties
+ for property_name, prop in our_relationship_properties.iteritems():
+ if property_name in property_definitions:
+ definition = property_definitions[property_name]
+ requirement._raw['relationship']['properties'][property_name] = \
+ coerce_parameter_value(context, presentation, definition, prop)
+ else:
+ context.validation.report(
+ 'relationship property "%s" not declared at definition of requirement "%s"'
+ ' in "%s"'
+ % (property_name, requirement._fullname,
+ presentation._container._container._fullname),
+ locator=our_relationship._get_child_locator('properties', property_name),
+ level=Issue.BETWEEN_TYPES)
+
+ our_interfaces = our_relationship.interfaces
+ if our_interfaces:
+ # Make sure we have a dict
+ if 'interfaces' not in requirement._raw['relationship']:
+ requirement._raw['relationship']['interfaces'] = OrderedDict()
+
+ # Merge interfaces
+ for interface_name, our_interface in our_interfaces.iteritems():
+ if interface_name not in requirement._raw['relationship']['interfaces']:
+ requirement._raw['relationship']['interfaces'][interface_name] = OrderedDict()
+
+ if (interface_definitions is not None) and (interface_name in interface_definitions):
+ interface_definition = interface_definitions[interface_name]
+ interface_assignment = requirement.relationship.interfaces[interface_name]
+ merge_interface(context, presentation, interface_assignment, our_interface,
+ interface_definition, interface_name)
+ else:
+ context.validation.report(
+ 'relationship interface "%s" not declared at definition of requirement "%s"'
+ ' in "%s"'
+ % (interface_name, requirement._fullname,
+ presentation._container._container._fullname),
+ locator=our_relationship._locator, level=Issue.BETWEEN_TYPES)
+
+
+def validate_requirement_assignment(context, presentation, requirement_assignment,
+ relationship_property_definitions,
+ relationship_interface_definitions):
+ relationship = requirement_assignment.relationship
+ if relationship is None:
+ return
+
+ validate_required_values(context, presentation, relationship.properties,
+ relationship_property_definitions)
+
+ if relationship_interface_definitions:
+ for interface_name, relationship_interface_definition \
+ in relationship_interface_definitions.iteritems():
+ interface_assignment = relationship.interfaces.get(interface_name) \
+ if relationship.interfaces is not None else None
+ validate_required_inputs(context, presentation, interface_assignment,
+ relationship_interface_definition, None, interface_name)
+
+
+def get_first_requirement(requirement_definitions, name):
+ if requirement_definitions is not None:
+ for requirement_name, requirement_definition in requirement_definitions:
+ if requirement_name == name:
+ return requirement_definition
+ return None