Here is an example to connect with Amazon MSK using API Gateway and Lambda for Kafka admin operation (i.e. Create topic, list topics, describe topic).
- A MSK cluster with Plaintext authentication
Follow the below steps to create a Lambda layer with kafka-python client.
- Create a folder called python on your local machine.
mkdir python
cd python
- Run the following command from commandline. This will download all the required packages on the folder you created earlier.
pip install -t . kafka-python
-
Zip the folder
-
Run the below command using aws cli to create the Lambda layer
aws lambda publish-layer-version --layer-name kafkaadmin --description "My layer" --zip-file fileb://python.zip --compatible-runtimes python3.6 python3.7 python3.8
- Note down the LayerARN. Sample:
"LayerArn": "arn:aws:lambda:ap-southeast-2:11111111111:layer:kafkaadmin"
To connect with your MSK cluster inside a VPC, your Lambda function must have execution role permissions as described here. Create a role and specify the role arn while creating the lambda function. Specify the subnets and security group of your MSK cluster. This example works with Plaintext authentication.
- Download the python script lambda_function.py and zip the file.
- Create a Lambda funciton using aws cli as below.
aws lambda create-function \
--function-name REPLACE-FUNCTION-NAME \
--runtime python3.8 \
--role arn:aws:iam::11111111111:role/REPLACE-ROLE-ARN \
--handler lambda_function.lambda_function \
--zip-file fileb://lambda_function.zip \
--vpc-config SubnetIds=REPLACE-SUBNET-1,REPLACE-SUBNET-2,REPLACE-SUBNET-3,SecurityGroupIds=REPLACE-SG-000000 \
--layers arn:aws:lambda:ap-southeast-2:11111111111:layer:REPLACE-LAYER-ARN
- Go to the API Gateway console: https://console.aws.amazon.com/apigateway.
- Click the "Create API" button.
- Choose REST API and click on Build.
- In the "New API" wizard, enter a name and description for your API.
- Choose "REST" as the protocol for your API.
- Click the "Create API" button.
- In the "Resources" pane, click the "Actions" dropdown and choose "Create Resource".
- In the "New Resource" dialog, enter a resource name (kafkaadmin) and click the "Create Resource" button
- Select the newly created resource. In the "Actions" dropdown for the new resource, choose "Create Method".
- Choose "ANY" and click on save.
- Select "Lambda Function" as the integration type. Check "Use Lambda Proxy integration".
- Enter the Lambda function name that you created earlier and click the "Save" button.
- Select "OK" to add invoke permission for the Lambda function.
- Click the "Deploy API" button in the "Actions" dropdown to deploy your API.
- In the "Deploy API" dialog, choose a deployment stage (e.g. test, prod) and click the "Deploy" button.
To list all the topics of your Kafka cluster enter the below URL on your browser:
REPLACE-WITH-INVOKE-URL/kafkaadmin?whichfunction=listtopics&bsrv=REPLACE-BOOTSTRAP-BROKERS
** To get the bootstrap brokers name for MSK, follow MSK documentation
Sample output:
To describe a topic enter the below URL on your browser:
REPLACE-WITH-INVOKE-URL/kafkaadmin?whichfunction=describetopic&bsrv=REPLACE-BOOTSTRAP-BROKERS&topicname=REPLACE-TOPIC-NAME
Sample output:
To describe a topic config enter the below url on your browser:
REPLACE-WITH-INVOKE-URL/kafkaadmin?whichfunction=gettopicconfig&bsrv=REPLACE-BOOTSTRAP-BROKERS&topicname=REPLACE-TOPIC-NAME
Sample output:
Assuming you are using a tiered storage enabled cluster. To create a tiered storage enabled topic on MSK, enter the below URL on your browser. This will create a tiered storage enabled topic with 259200000ms total retention, 3600000ms location retention, 10 partitions and a replication factor of 2:
REPLACE-WITH-INVOKE-URL/kafkaadmin?whichfunction=createtopic&bsrv=REPLACE-BOOTSTRAP-BROKERS&topicname=REPLACE-TOPIC-NAME&retention=259200000&localretention=3600000&partition=10&rf=2
To create a Kafka topic without tiered storage on MSK, enter the below URL on your browser. This will create a Kafka topic with 259200000ms data retention, 10 partitions and a replication factor of 2. The create topic API only enables tiered storage feature if local retention is lower than overall topic retention:
REPLACE-WITH-INVOKE-URL/kafkaadmin?whichfunction=createtopic&bsrv=REPLACE-BOOTSTRAP-BROKERS&topicname=REPLACE-TOPIC-NAME&retention=259200000&localretention=259200000&partition=10&rf=2
Sample output: