Source code for cottonformation.core.template

# -*- coding: utf-8 -*-

"""
This module implements the core component CloudFormation Template. Many
black magic features are provided.
"""

import typing as T
import json
import attr
from collections import OrderedDict

import cfn_flip
from toposort import toposort

from .model import (
    _Addable,
    Parameter,
    Resource,
    Output,
    Rule,
    Mapping,
    Condition,
    Transform,
    ResourceGroup,
    Tag,
    TypeHint,
    get_id,
    get_key_value_dict,
    remove_id_and_empty,
    serialize,
    _class_type_to_attr_mapper,
)
from .constant import MetaData, CloudFomation
from .config import CtfConfig
from .exc import (
    AWSObjectLogicIdConflictError,
    AWSObjectNotExistsError,
)
from ..res.cloudformation import Stack
from .._version import __version__


[docs]@attr.s class Template: """ Represent an AWS CloudFormation template object. .. warning:: Don't ever directly edit the ``Template.Parameters``, ``Template.Resources``, ``Template.Outputs`` container dictionary. Please use the ``Template.add``, ``Template.remove`` api. Because there's a lot of logic been handled to maintain the state of the relationship. Reference: - Template anatomy: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/template-anatomy.html .. versionadded:: 1.0.1 """ AWSTemplateFormatVersion: str = attr.ib(default="2010-09-09") Description: str = attr.ib(default="No description for this template") Metadata: dict = attr.ib(factory=OrderedDict) Parameters: T.Dict[str, Parameter] = attr.ib(factory=OrderedDict) Rules: T.Dict[str, Rule] = attr.ib(factory=OrderedDict) Mappings: T.Dict[str, Mapping] = attr.ib(factory=OrderedDict) Conditions: T.Dict[str, Condition] = attr.ib(factory=OrderedDict) Resources: T.Dict[str, Resource] = attr.ib(factory=OrderedDict) Outputs: T.Dict[str, Output] = attr.ib(factory=OrderedDict) Transform: T.List["Transform"] = attr.ib(factory=list) NestedStack: T.Dict[str, "Template"] = attr.ib(factory=OrderedDict) Groups: T.Dict[str, "ResourceGroup"] = attr.ib(factory=OrderedDict) _deps_data_need_build_flag: bool = attr.ib(default=True) _deps_on_data_cache: T.Dict[str, T.Set[str]] = attr.ib( factory=OrderedDict ) _deps_by_data_cache: T.Dict[str, T.Set[str]] = attr.ib( factory=OrderedDict ) _deps_sort_need_build_flag: bool = attr.ib(default=True) _deps_sort_cache: T.Dict[str, int] = attr.ib(factory=OrderedDict) @property def n_parameter(self): """ Return number of Parameters declared. """ return len(self.Parameters) @property def n_resource(self): """ Return number of Resources declared. """ return len(self.Resources) @property def n_output(self): """ Return number of Outputs declared. """ return len(self.Outputs) @property def n_rule(self): """ Return number of Rules declared. """ return len(self.Rules) @property def n_mapping(self): """ Return number of Mappings declared. """ return len(self.Mappings) @property def n_condition(self): """ Return number of Conditions declared. """ return len(self.Conditions) @property def n_transform(self): """ Return number of Transform declared. """ return len(self.Transform) @property def n_named_object(self): """ Return number of named object declared in this template. For example, Parameter, Resource, Output, Rule, Mapping, Condition are named object, because they have a logic id. """ return sum( [ self.n_parameter, self.n_resource, self.n_output, self.n_rule, self.n_mapping, self.n_condition, ] ) # handle the inter dependency relationship among Parameter, Mapping, # Condition, Resource, Output def _encode_depends_on( self, obj_list: T.List[TypeHint.dependency_obj], ) -> T.Set[str]: """ In generic dependency resolver algorithm, we don't need object, we only need the gid string. This method can ensure return a list of gid. """ st = set() for obj in obj_list: if isinstance(obj, str): st.add(self.Resources[obj].gid) else: st.add(obj.gid) return st def _iterate_addable( self, include_resource_group: bool = False, ) -> T.List[T.Tuple[str, TypeHint.addable_obj]]: """ Iterate through all addable object (Parameter, Resource, Output, ...). """ l = list() for class_type, attr_name in _class_type_to_attr_mapper.items(): if class_type == ResourceGroup.CLASS_TYPE: if include_resource_group is False: continue collection = getattr(self, attr_name) for obj in collection.values(): l.append((obj.gid, obj)) return l def _iterate_addable_keys( self, include_resource_group: bool = False, ) -> T.List[str]: return [gid for gid, _ in self._iterate_addable(include_resource_group)] def _build_deps_data( self, ) -> T.Tuple[ T.Dict[str, T.Set[str]], T.Dict[str, T.Set[str]] ]: deps_on_data = OrderedDict() deps_by_data = OrderedDict() for gid, _ in self._iterate_addable(include_resource_group=True): deps_on_data[gid] = set() deps_by_data[gid] = set() for gid, obj in self._iterate_addable(include_resource_group=True): deps_on_data[gid] = self._encode_depends_on(obj.DependsOn) for parent_gid in self._encode_depends_on(obj.DependsOn): deps_by_data[parent_gid].add(gid) return deps_on_data, deps_by_data @property def deps_on_data(self) -> T.Dict[str, T.Set[str]]: """ Depends on data is a dictionary structure. It shows the dependency relationship in this way (child depends on parent):: { child_id_1: {parent_id_11, parent_id_12, ...}, child_id_2: {parent_id_21, parent_id_22, ...}, ... } """ if self._deps_data_need_build_flag: deps_on_data, deps_by_data = self._build_deps_data() self._deps_on_data_cache = deps_on_data self._deps_by_data_cache = deps_by_data self._deps_data_need_build_flag = False self._deps_sort_need_build_flag = True return self._deps_on_data_cache @property def deps_by_data(self) -> T.Dict[str, T.Set[str]]: """ Depends on data is a dictionary structure. It shows the dependency relationship in this way (child depends on parent):: { parent_id_1: {child_id_11, child_id_12, ...}, parent_id_2: {child_id_21, child_id_22, ...}, ... } """ if self._deps_data_need_build_flag: deps_on_data, deps_by_data = self._build_deps_data() self._deps_on_data_cache = deps_on_data self._deps_by_data_cache = deps_by_data self._deps_data_need_build_flag = False self._deps_sort_need_build_flag = True return self._deps_by_data_cache @property def deps_sort(self) -> T.Dict[str, int]: if self._deps_sort_need_build_flag: self._deps_sort_cache = OrderedDict() for ind, st in enumerate(toposort(self.deps_on_data)): st = list(st) st.sort() for v in st: self._deps_sort_cache[v] = ind self._deps_sort_need_build_flag = False return self._deps_sort_cache # --- handle AWS Object
[docs] def add_one( self, obj: TypeHint.addable_obj, add_or_update: bool = False, add_or_ignore: bool = False, ) -> bool: """ Add single object to Template. If there is a existing object with the same logic id and no flag is passed, exception will be raised. :param obj: The object you add to template. :param add_or_update: if True, will overwrite other object with the same logic id :param add_or_ignore: if True, will ignore and pass if there's existing object with the same logic id :return: a boolean flag indicates that whether change is made. """ # validate argument if not isinstance(obj, _Addable): raise TypeError( f"You cannot add a {obj.__class__.__name__} object to template" ) if add_or_update and add_or_ignore: raise ValueError("Can't do add_or_update=True and add_or_ignore=True") obj: TypeHint.addable_obj # find values for future use collection: T.Union[ T.Dict[str, TypeHint.addable_obj], T.List[Transform], ] = getattr(self, _class_type_to_attr_mapper[obj.CLASS_TYPE]) # add this object if obj.id in collection: # handle logic id conflict if add_or_update: collection[obj.id] = obj self._deps_data_need_build_flag = True return True elif add_or_ignore: return False else: # raise exception if isinstance(obj, Resource): type_name = Resource.__name__ else: type_name = obj.__class__.__name__ raise AWSObjectLogicIdConflictError( f"{type_name} logic id '{obj.id}' already exists!" ) else: collection[obj.id] = obj self._deps_data_need_build_flag = True return True
[docs] def add( self, obj: TypeHint.addable_obj, _objects_to_update: T.Dict[str, TypeHint.addable_obj] = None, ): """ Add an AWS object to the template. If the obj declared some dependency AWS Objects like Parameter, Mapping, Condition, it will also add those objects. If there's any logic id conflict include the root object or those dependency objects, the new object will overwrite the existing one. This method is atomic. In other word, either all objects succeed or non. """ if _objects_to_update is None: is_first_call = True _objects_to_update = OrderedDict() else: is_first_call = False _objects_to_update[obj.gid] = obj # add dependency objects if obj.DependsOn is not None: for dep_obj in obj.DependsOn: if isinstance(dep_obj, str): if self.Resources: pass if dep_obj not in self.Resources: raise AWSObjectNotExistsError.make( obj_type=CloudFomation.Resources, logic_id=dep_obj, ) dep_obj = self.Resources[dep_obj] self.add(dep_obj, _objects_to_update=_objects_to_update) if is_first_call: # validate type before making the change for obj in _objects_to_update.values(): if not isinstance(obj, _Addable): raise TypeError( f"You cannot add a {obj.__class__.__name__} object to template" ) for obj in _objects_to_update.values(): self.add_one(obj, add_or_update=True)
def _get_by_gid( self, gid: str, ) -> TypeHint.addable_obj: """ Get aws object by global id. """ class_type, logic_id = gid.split("--") collection = getattr(self, _class_type_to_attr_mapper[class_type]) return collection[logic_id]
[docs] def remove_one( self, obj: TypeHint.addable_obj, ignore_not_exists: bool = False, ) -> bool: """ Remove single object from Template. If there is no a existing object with the same logic id and no flag is passed, exception will be raised. :param obj: The object you remove template. :param ignore_not_exists: if True, will ignore and pass if there's NO existing object with the same logic id :return: a boolean flag indicates that whether change is made. """ # validate argument if not isinstance(obj, _Addable): raise TypeError( f"You cannot remove a {obj.__class__.__name__} object from template" ) obj: TypeHint.addable_obj collection = getattr(self, _class_type_to_attr_mapper[obj.CLASS_TYPE]) # remove object if obj.id in collection: collection.pop(obj.id) self._deps_data_need_build_flag = True return True elif ignore_not_exists: return False else: raise AWSObjectNotExistsError.make(obj_type=obj.CLASS_TYPE, logic_id=obj.id)
[docs] def remove( self, obj: TypeHint.addable_obj, _deps_by_data: OrderedDict = None, _objects_to_remove: T.Dict[str, TypeHint.addable_obj] = None, ): """ Remove a AWS object from the template. If there are other objects depend on this object, it will also remove other objects. This method is atomic. In other word, either all objects succeed or non. """ if _objects_to_remove is None: is_first_call = True _objects_to_remove = OrderedDict() else: is_first_call = False if _deps_by_data is None: _deps_by_data = self.deps_by_data _objects_to_remove[obj.gid] = obj # handle other object depends on this for child_gid in self.deps_by_data.get(obj.gid, set()): try: child_obj = self._get_by_gid(child_gid) self.remove( child_obj, _deps_by_data=_deps_by_data, _objects_to_remove=_objects_to_remove, ) except KeyError: pass # if it is resource group, aws objects in the DependsOn list should # also be removed if isinstance(obj, ResourceGroup): for child_obj in obj.DependsOn: if (not isinstance(child_obj, ResourceGroup)) and ( child_obj.gid not in _objects_to_remove ): self.remove( child_obj, _deps_by_data=_deps_by_data, _objects_to_remove=_objects_to_remove, ) if is_first_call: # validate type before making the change for obj in _objects_to_remove.values(): if not isinstance(obj, _Addable): raise TypeError( f"You cannot remove a {obj.__class__.__name__} object to template" ) for obj in _objects_to_remove.values(): self.remove_one(obj, ignore_not_exists=True)
# --- parameter handling def get_param_values(self) -> T.Dict[str, T.Any]: return OrderedDict([(p.id, p.get_value()) for p in self.Parameters.values()]) # --- nested stack
[docs] def add_nested_stack( self, stack: T.Union[str, Stack], template: "Template", ) -> bool: """ Reference: - Nested Stack: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-nested-stacks.html """ stack_logic_id = get_id(stack) if stack_logic_id not in self.Resources: raise ValueError( "there's no AWS::CloudFormation::Stack resource defined in " "the template" ) self.NestedStack[stack_logic_id] = template return True
# --- serialization
[docs] def to_dict(self) -> dict: """ Serialize the template, convert it to python dict. """ self.__pre_serialize__() dct = get_key_value_dict(self) dct.pop("NestedStack") dct.pop("Groups") dct.pop("_deps_data_need_build_flag") dct.pop("_deps_on_data_cache") dct.pop("_deps_by_data_cache") dct.pop("_deps_sort_need_build_flag") dct.pop("_deps_sort_cache") dct = remove_id_and_empty(dct) dct = serialize(dct) return dct
[docs] def to_json( self, human_readable: bool = True, ) -> str: """ Convert template to json string. """ if human_readable: indent = 4 else: # pragma: no cover indent = None return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
[docs] def to_json_file( self, path: str, ): # pragma: no cover """ Dump template to json file. """ with open(path, "w") as f: f.write(self.to_json())
[docs] def to_yml(self) -> str: # pragma: no cover """ Convert template to yaml string """ return cfn_flip.to_yaml( self.to_json(human_readable=True), clean_up=False, long_form=False )
[docs] def to_yml_file( self, path: str, ): # pragma: no cover """ Dump template to yaml file. """ with open(path, "w") as f: f.write(self.to_yml())
def __attrs_post_init__(self): if CtfConfig.Template.enable_post_init_hook: self._system_post_init_hook() self.post_init_hook() def _system_post_init_hook(self): pass
[docs] def post_init_hook(self): """ User can overwrite this method to extend cottonformation.Template """ pass
def __pre_serialize__(self): if CtfConfig.Template.enable_pre_serialize_hook: self._system_pre_serialize_hook() self.pre_serialize_hook() def _system_pre_serialize_hook(self): self.Metadata.setdefault(MetaData.CTF, OrderedDict()) self.Metadata[MetaData.CTF][MetaData.Version] = __version__
[docs] def pre_serialize_hook(self): """ User can overwrite this method to extend cottonformation.Template """ pass
[docs] def batch_tagging( self, tags: T.Dict[str, str], mode_skip: bool = False, mode_overwrite: bool = False, mode_raise: bool = False, ): """ Batch tag all resources if supporting Tags. :param tags: key value tags in python dictionary :param mode_skip: if the key already exists, then skip it :param mode_overwrite: if the key already exists, then overwrite it with new value :param mode_raise: if the key already exists, then raise error """ for r in self.Resources.values(): if r.support_tags(): r.update_tags( tags, mode_skip=mode_skip, mode_overwrite=mode_overwrite, mode_raise=mode_raise, )
# factory method
[docs] @classmethod def from_many_objects( cls, objects: T.Iterable[TypeHint.addable_obj], ) -> "Template": """ A factory method can create a Template object from many AWS Objects. """ tpl = cls() for obj in objects: tpl.add(obj) return tpl