Source code for cottonformation.core.helpers.awslambda

# -*- coding: utf-8 -*-

from typing import Union
from ...res import (
    awslambda, s3, kinesis, kinesisfirehose, sns, sqs, dynamodb,
    codecommit, events, msk,
)
from ..model import AWS_ACCOUNT_ID, AWS_REGION, Sub


[docs]class LambdaRuntime: """ Aws Lambda related constant helpers. This data is based on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html """ nodejs14 = "nodejs14.x" nodejs12 = "nodejs12.x" nodejs10 = "nodejs10.x" python39 = "python3.9" python38 = "python3.8" python37 = "python3.7" python36 = "python3.6" python27 = "python2.7" ruby27 = "ruby2.7" ruby25 = "ruby2.5" java11 = "java11" java8amzlinux2 = "java8.al2" java8amzlinux1 = "java8" go1x = "go1.x" dotnet31 = "dotnetcore3.1" dotnet21 = "dotnetcore2.1" custom_amzlinux2 = "provided.al2" custom_amzlinux1 = "provided"
def _to_func_arn(func: Union[str, awslambda.Function]): if isinstance(func, str): function_name = Sub( string="arn:aws:lambda:${aws_region}:${aws_account_id}:function:${func_name}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, func_name=func, ), ) else: function_name = func.rv_Arn return function_name
[docs]def create_permission_for_s3_event( logic_id: str, func: Union[str, awslambda.Function], bucket: Union[str, s3.Bucket], ) -> awslambda.Permission: """ .. note:: The s3 bucket has to be in the same region of lambda function. .. note:: The source arn has to be a bucket without any prefix, otherwise it won't pass the validation. Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html """ func_arn = _to_func_arn(func) if isinstance(bucket, str): if bucket.startswith("arn:aws:s3:::"): source_arn = bucket else: source_arn = f"arn:aws:s3:::{bucket}" else: source_arn = Sub( string="arn:aws:s3:::${bucket}", data=dict(bucket=bucket.ref()) ) return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="s3.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_kinesis_data_stream( logic_id: str, func: Union[str, awslambda.Function], data_stream: Union[str, kinesis.Stream] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html """ func_arn = _to_func_arn(func) if isinstance(data_stream, str): if data_stream.startswith("arn:aws:kinesis:"): source_arn = data_stream else: source_arn = Sub( string="arn:aws:kinesis:${aws_region}:${aws_account_id}:stream/${data_stream}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, data_stream=data_stream, ) ) else: source_arn = data_stream.rv_Arn return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="kinesis.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_firehose_delivery_stream( logic_id: str, func: Union[str, awslambda.Function], delivery_stream: Union[str, kinesisfirehose.DeliveryStream] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ func_arn = _to_func_arn(func) if isinstance(delivery_stream, str): if delivery_stream.startswith("arn:aws:firehose:"): source_arn = delivery_stream else: source_arn = Sub( string="arn:aws:firehose:${aws_region}:${aws_account_id}:deliverystream/${delivery_stream}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, delivery_stream=delivery_stream, ) ) else: source_arn = delivery_stream.rv_Arn return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="firehose.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_sns( logic_id: str, func: Union[str, awslambda.Function], topic: Union[str, sns.Topic] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html """ func_arn = _to_func_arn(func) if isinstance(topic, str): if topic.startswith("arn:aws:sns:"): source_arn = topic else: source_arn = Sub( string="arn:aws:sns:${aws_region}:${aws_account_id}:${sns_topic}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, sns_topic=topic, ) ) else: source_arn = topic.ref() return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="sns.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_sqs( logic_id: str, func: Union[str, awslambda.Function], queue: Union[str, sqs.Queue] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html """ func_arn = _to_func_arn(func) if isinstance(queue, str): if queue.startswith("arn:aws:sqs:"): source_arn = queue else: source_arn = Sub( string="arn:aws:sqs:${aws_region}:${aws_account_id}:${queue}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, queue=queue, ) ) else: source_arn = queue.ref() return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="sqs.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_dynamodb( logic_id: str, func: Union[str, awslambda.Function], table: Union[str, dynamodb.Table] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html """ func_arn = _to_func_arn(func) if isinstance(table, str): if table.startswith("arn:aws:dynamodb:"): source_arn = table else: source_arn = Sub( string="arn:aws:dynamodb:${aws_region}:${aws_account_id}:table/${table}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, table=table, ) ) else: source_arn = table.rv_Arn return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="dynamodb.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_code_commit( logic_id: str, func: Union[str, awslambda.Function], repo: Union[str, codecommit.Repository] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html """ func_arn = _to_func_arn(func) if isinstance(repo, str): if repo.startswith("arn:aws:codecommit:"): source_arn = repo else: source_arn = Sub( string="arn:aws:codecommit:${aws_region}:${aws_account_id}:${repo}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, repo=repo, ) ) else: source_arn = repo.rv_Arn return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="codecommit.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_cloudwatch_event( logic_id: str, func: Union[str, awslambda.Function], rule: Union[str, events.Rule] ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchevents.html """ func_arn = _to_func_arn(func) if isinstance(rule, str): if rule.startswith("arn:aws:events:"): source_arn = rule else: source_arn = Sub( string="arn:aws:events:${aws_region}:${aws_account_id}:rule/${rule}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, rule=rule, ) ) else: source_arn = rule.rv_Arn return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="events.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )
[docs]def create_permission_for_msk( logic_id: str, func: Union[str, awslambda.Function], cluster: Union[str, msk.Cluster], topic: str, cluster_uuid: str = None, ): """ Ref: - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ func_arn = _to_func_arn(func) if isinstance(cluster, str): if cluster.startswith("arn:aws:kafka:"): source_arn = cluster else: source_arn = Sub( string="arn:aws:kafka:${aws_region}:${aws_account_id}:topic/${cluster_name}/${cluster_uuid}/${topic}", data=dict( aws_account_id=AWS_ACCOUNT_ID, aws_region=AWS_REGION, cluster_name=cluster, cluster_uuid=cluster_uuid, topic=topic, ) ) else: source_arn = Sub( string="${cluster_arn}/${topic}", data=dict( cluster_arn=cluster.ref(), topic=topic, ) ) return awslambda.Permission( logic_id, rp_FunctionName=func_arn, rp_Action="lambda:InvokeFunction", rp_Principal="kafka.amazonaws.com", p_SourceArn=source_arn, p_SourceAccount=AWS_ACCOUNT_ID, )