Overview
We will cover how to set up mutual TLS authentication and authorization on Amazon MSK.
Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data. You can enable client authentication with TLS for connections and client authorization from your applications to your Amazon MSK brokers and ZooKeeper nodes.
Prerequisites
- Terraform: For creating a private CA and MSK Cluster
- AWS CLI: For creating TLS certificates (the user must have access to create a private CA, issue certificates, and create MSK cluster)
Setup TLS authentication and authorization
To use client authentication with TLS on MSK, you need to create the following resources:
- AWS Private CA
- MSK cluster with TLS encryption enabled
- Client certificates
Create AWS Private CA
AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see Creating and Managing a AWS Private CA. In this setup, we will use Terraform to create a private CA.
Steps to create Private CA
- Run below Terraform code to create the Private CA.
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
resource "aws_acmpca_certificate_authority" "root_ca" {
certificate_authority_configuration {
key_algorithm = "RSA_4096"
signing_algorithm = "SHA512WITHRSA"
subject {
#Update the attributes as per your need
common_name = "exp-msk-ca"
country = "US"
locality = "Seattle"
organization = "Example Corp"
organizational_unit = "Sales"
state = "WA"
}
}
type = "ROOT"
}
- Once the private CA is created, install the certificate from the AWS console.
Steps to install the certificate.
- If you are not already on the CA’s details page, open the AWS Private CA console at https://console.aws.amazon.com/acm-pca/home. On the private certificate authorities page, choose a root CA that you have created with the certificate status as Pending or Active.
- Choose Actions, and installthe CA certificate to open the Install root CA certificate page.
- Under Specify the root CA certificate parameters, specify the following certificate parameters:
- Validity — Specifies the expiration date and time for the CA certificate. The AWS Private CA default validity period for a root CA certificate is ten years.
- Signature algorithm — Specifies the signing algorithm to use when the root CA issues new certificates. Available options vary according to the AWS Region where you are creating the CA. For more information, see Compatible signing algorithms, Supported cryptographic algorithms, and SigningAlgorithm in CertificateAuthorityConfiguration.
- SHA256 RSA
- Review your settings to make sure they’re correct, then choose Confirm and install.
- The details page for the CA displays the status of the installation (success or failure) at the top. If the installation was successful, the newly completed root CA displays a status of Active in the General pane.
Create an MSK cluster that supports TLS client authentication.
Note: We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.
Run the below Terraform code to create MSK cluster
Note: Update attributes as per the requirement and configurations.
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
module "kafka" {
source = "cloudposse/msk-apache-kafka-cluster/aws"
# Cloud Posse recommends pinning every module to a specific version
version = "2.3.0"
name = "test-msk-cluster" #Change MSK cluster name as per your need
vpc_id = "<VPC_ID>"
subnet_ids = ["SUBNET1a","SUNBNET2b"] # Minimum 2 subnets required.
kafka_version = "3.4.0" #recommended version by AWS as of 19 Sep 2022
broker_per_zone = 1 #Number of broker per availability zone
broker_instance_type = "kafka.t3.small" #MSK instance types
broker_volume_size = 10 #Broker disk size
certificate_authority_arns = ["<CA_ARN>"] #arn of the CA that you have created in the earlier step
client_tls_auth_enabled = true
encryption_in_cluster = true
client_broker = "TLS" # Enables TLS encryption
enhanced_monitoring = "PER_TOPIC_PER_BROKER"
cloudwatch_logs_enabled = false # Enable if you need cloudwatch logs
jmx_exporter_enabled = false # Enable if you need jmx metrics
node_exporter_enabled = false # Enable if you need node metrics
associated_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
allowed_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
create_security_group = false
}
#-----------------------End--------------------#
resource "aws_security_group" "kafka_sg" {
name = "test-msk-cluster-sg" #Change the name as per your need
description = "Security Group for kafka cluster"
vpc_id = "<VPC_ID>"
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 2181
to_port = 2181
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 9094
to_port = 9094
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
# Enable if you need to add tags to MSK cluster
#tags = var.tags
# Enable if you need cloudwatch logs
# depends_on = [
# aws_cloudwatch_log_group.cw_log_group
#]
}
# Required for cloudwatch logs
# resource "aws_cloudwatch_log_group" "cw_log_group" {
# name = "blog-msk-cluster"
# #tags = var.tags
# }
output "bootstrap_url" {
value = module.kafka.bootstrap_brokers_tls
description = "Comma separated list of one or more DNS names (or IP addresses) and TLS port pairs for access to the Kafka cluster using TLS"
}
It will take 15-20 minutes to create the MSK cluster.
Note: Since the bootstrap URL will be used to communicate with the MSK cluster using the Kafka CLI or SDKs, save it from the Terraform output.
Create TLS certificates using previously created AWS Private CA
We will create two certificates, one is for admin access, and the other one is for client access. For creating certificates, a common name (CN) is required. The CN is used as a principal while granting permissions through kafka ACLs
Create admin TLS certificate
Steps to create TLS certificate
- Generate CSR and key.
openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj '/CN=admin'
- Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).
certArn=$(aws acm-pca issue-certificate --region <region> --certificate-authority-arn "<CA_ARN>"
--csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
- Get the certificate ARN issued in the previous step.
aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/t/n/g' >
cert.pem
- Export the certificate in pkcs12 format.
openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out admin.p12
Create client TLS certificate
- Generate CSR and key
openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj
'/CN=client'
- Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).
certArn=$(aws acm-pca issue-certificate --region <region>
--certificate-authority-arn "<CA_ARN>" --csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
- Get certificate ARN issue in the previous step.
aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/t/n/g' >
cert.pem
- Export the certificate in pkcs12 format.
openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out client.p12
Setup a client machine to interact with the MSK cluster
- Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See Step 3: Create a client machine for an example of how to create such a client machine.
- Copy previously created certificates admin.p12 and client.p12 into the client machine.
- Install java8+ on the client machine.
- Download Kafka binaries and extract
- Create admin and client configuration files for authentication and authorization.
cat <<EOF> admin.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./admin.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
cat <<EOF> client.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./client.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
Test Authentication and Authorization using ACLs
Create Admin ACLs for granting admin access to clusters, topics, and groups
By default, the MSK cluster will allow everyone if no ACL is found. Here the Admin ACL will be the first ACL. The Admin user (“User:CN=admin”) will leverage on Admin ACL to grant permissions to Client User(“User:CN=client”).
ACL for managing cluster operations (Admin ACL).
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--add
--allow-principal "User:CN=admin"
--operation All
--cluster
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
ACL for managing topics permissions (Admin ACL).
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--add
--allow-principal "User:CN=admin"
--operation All
--topic "*"
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
ACL for managing group permissions (Admin ACL).
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--add
--allow-principal "User:CN=admin"
--operation All
--group "*"
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
Create a topic.
./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--create --topic test-topic --command-config admin.properties
List topic and check the topic is created.
./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--list --command-config admin.properties
Grant write permission to the topic so that client (producer) can publish messages to the topic (Use admin user for granting access to client).
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--add
--allow-principal "User:CN=client"
--operation Write
--topic "test-topic"
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
Publish messages to the topic using client user.
for x in {1..10}; do echo "message $x"; done |
./kafka_2.13-3.5.0/bin/kafka-console-producer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --producer.config kafka-admin-client.properties --topic
test-topic --producer-property enable.idempotence=false
Consume messages.
Note: If you try to consume messages from the topic using a consumer group, you will get a group authorization error since the client user is not authorized to access groups.
./kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --topic test-topic --max-messages 2 --consumer-property
enable.auto.commit=false --consumer-property group.id=consumer-test
--from-beginning --consumer.config client.properties
Grant group permission to the client user.
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--add
--allow-principal "User:CN=client"
--operation Read
--resource-pattern-type prefixed
--group 'consumer-'
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
After providing group access, the client user should be able to consume messages from the topic using a consumer group.
This way, you can manage client access to the topics and groups.
Additional Commands
List ACL.
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--bootstrap-server "<BOOTSTRAP_URL>"
--list
--command-config admin.properties
Delete ACL (Create and delete ACL commands are same except –aad/–remove argument).
./kafka_2.13-3.5.0/bin/kafka-acls.sh
--remove
--allow-principal "User:CN=admin"
--operation All
--cluster
--bootstrap-server "<BOOTSTRAP_URL>"
--command-config admin.properties
Conclusion
AWS MSK eases the effort of managing independently hosted Kafka clusters. Users can scale Kafka brokers and storage as necessary. MSK supports TLS encryption and allows users to create TLS connections from the application to Amazon MSK brokers and ZooKeeper nodes with the help of the AWS Private CA, which enables users to create certificates for authentication.



