Skip to content

junbetterway/spring-cloud-func-aws-lambda-kafka-event-source-basic

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Event Source Using Apache Kafka With AWS Lambda Powered By Spring Cloud Stream/Function

Event-driven architecture is a software architecture paradigm promoting the production, detection, consumption of, and reaction to events. It was created to help developers have a decoupled and responsive application. Because of this, it has been widely used in applications that have been broken down from monoliths to microservices.

Spring Cloud Stream improves your productivity when working with Apache Kafka, RabbitMQ, Azure Event Hub, and more, providing three key abstractions to simplify your code.

Spring Cloud Function enables you to write functions once and run them anywhere (AWS, Azure, etc.), while continuing to use all the familiar and comprehensive Spring APIs. You can chain multiple functions together to create new capabilities

The main goal of this repository is to introduce you on how we can create a serverless function using AWS Lambda and creating a trigger via an event source such as Apache Kafka. If you want to know the basic for Spring Cloud Stream/Function with Kafka Binder then please visit my previous tutorial.

AWS Lambda now allows customers to build applications that can be triggered by messages in an Apache Kafka cluster hosted on any infrastructure - either Self-managed Apache Kafka Cluster or AWS Managed Streaming for Kafka. Customers can keep their existing Apache Kafka cluster as-is, and quickly and easily build Kafka consumer applications with Lambda without needing to worry about provisioning or managing servers.

Please refer to the offical AWS Documentation on how to use AWS Lambda with an Apache Kafka cluster for more details.

I prefer this AWS Blog since it has some examples on how to completely configure self-managed Apache Kafka as Event Source for AWS Lambda.

Getting Started

Our main application is pretty simple with traditional bean definition:

@SpringBootApplication
@Log4j2
public class SpringcloudfuncApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringcloudfuncApplication.class, args); 
	}

	@Bean
	public Function<Message<String>, String> greeter() {
		return (input) -> {
			log.info("Hello there {}", input.getPayload());
			return "Hello there " + input.getPayload();
		};
	}

}

It will just return a greeting based on the messages sent to a Kafka topic. Now, this is our application.yml file:

spring:
  cloud:
    function:
      definition: greeter
    stream:
      kafka:
        default:
          consumer:
            startOffset: earliest
        binder:
          brokers: ${KAFKA_BROKERS:localhost:9092}
      bindings:
        greeter-in-0:
          destination: topic-names
        greeter-out-0:
          destination: topic-greetings

Given this, if you recall from our previous tutorial we do not need anymore to define an environment variable SPRING_CLOUD_FUNCTION_DEFINITION during our creation of AWS Lambda function since it is already configured on the above yml file. However, we need to make sure that we define the value for our environment variable KAFKA_BROKERS which is/are the values of Private IPv4 DNS of the configured EC2 instances acting as Kafka brokers.

Create Self-Managed Apache Kafka Cluster On AWS EC2 Instance

A. Create The AWS Networking, Security Groups and EC2

  1. Create a VPC with public and private subnetsC and a NAT Gateway (or NAT instance) that allows internet access on the routing table. See this for more details.
  2. Create a Security Group for your Kafka cluster with the following inbound rules:
Custom TCP: port 2181 from all sources.
Custom TCP: port 9092 from all sources.
SSH: restricted to your own IP address or security group of your bastion server.
  1. Create an EC2 instance and assign the private subnet you created from step 1.
  2. Assign an IAM role with AmazonSSMManagedInstanceCore policy.
  3. Assign the security group you created from step 2.
  4. Once the EC2 is up and running, connect to it either via SSH or via SSM.

B. Install/Configure Apache Kafka/Zookeeper

  1. Install Java
sudo yum apt update
sudo amazon-linux-extras install java-openjdk11
  1. Download and Extract Apache Kafka
wget http://www-us.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar xzf kafka_2.13-2.7.0.tgz
mv kafka_2.13-2.7.0 /usr/local/kafka

Our next steps will be to create systemd unit files for the Zookeeper and Kafka. This will help us to manage Kafka/Zookeeper to run as services using the systemctl command.

  1. Setup Zookeeper Systemd Unit File. Run the command:
vim /etc/systemd/system/zookeeper.service

Paste below content:

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Save and exit

  1. Setup Kafka Systemd Unit File. Run the command:
vim /etc/systemd/system/kafka.service

Paste below content:

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.9.11-0.amzn2.0.1.x86_64"
Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms256M"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

Save and exit

Note: The environment fields above will depend on your setup. For my case, I use a free-tier EC2 instance type thus limited memory which explains the value for KAFKA_HEAP_OPTS. Also, the JAVA_HOME value will depend on your JDK installation path. One can check this by running the command update-alternatives --config java.

  1. Reload the systemd daemon to apply new changes.
systemctl daemon-reload
  1. Start Zookeeper Server
sudo systemctl start zookeeper

One can check the status if active by running below command:

sudo systemctl status zookeeper
  1. Start Kafka Server
sudo systemctl start kafka

One can check the status if active by running below command:

sudo systemctl status kafka
  1. Create a topic in Kafka. Spring Boot will automatically create it for you but remember we will use it as a trigger for our AWS Lambda so might as well create it now and configure our Spring Boot application to subscribe to these topics via the application.yml file. We will create two topics: topic-names and topic-greetings
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-names
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-greetings

Note: The replication-factor describes how many copies of data will be created and since we are running with a single instance keep this value 1. Set the partitions option as the number of brokers you want your data to be split between. And since we are running with a single broker keep this value 1.

To check all the topics under the Kafka cluster:

cd /usr/local/kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181
  1. Send Messages to Kafka - Producer
cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-names

then type a few messages (hit enter) into the console to send to the server

  1. Receive Messages from Kafka - Consumer
cd /usr/local/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-names --from-beginning

you should see the messages you created in step 9.

Configure AWS Lambda With Apache Kafka As Trigger

One can visit my previous basic tutorial on how to build your Spring Cloud Function as shaded JAR then use it to create an AWS Lambda function.

  1. Go to AWS Lambda Console then create a function by providing a unique name (e.g., MyGreeterFunc) and runtime environment (e.g., Java 11)
  2. Once successfully created, let's setup the configuration by uploading the AWS deployable shaded JAR (springcloudfunc-0.0.1-SNAPSHOT-aws.jar) under the Function Code section
  3. Update the Handler field under Runtime settings section and paste below default handler provided by Spring Cloud Function
org.springframework.cloud.function.adapter.aws.FunctionInvoker::handleRequest
  1. Let's edit the Environment variables and add a new entry KAFKA_BROKERS which is explained under the Getting Started section above. The value of this is/are the private IPv4 DNS:portNo of your EC2 instances as Apache Kafka broker (comma-delimited).
KAFKA_BROKERS=ip-10-0-1-199.ap-southeast-1.compute.internal:9092
  1. Edit Basic settings and increase the timeout from 15s to at least 1 minute. Feel free to add more memory (e.g., 1024).
  2. Now go to Permissions tab and click the role name to open the IAM console. Aside from the existing AWSLambdaBasicExecutionRole policy, add a new inline policy with a name SelfHostedKafkaPolicy with the following permissions:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeVpcs",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeSubnets",
                "ec2:DescribeSecurityGroups",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

then save

  1. Back in the Lambda function, go back to the Configuration tab. In the Designer panel, choose Add trigger.
  2. In the dropdown, select Apache Kafka:
For Bootstrap servers, add each EC2 instances (with Kafka broker) private IPv4 DNS addresses with port 9092 appended.
For Topic name, enter 'topic-names'.
Enter your preferred batch size and starting position values
For VPC, select the VPC you created earlier in section A above
For VPC subnets, select the private subnet from section A above
For VPC security groups, select the Kafka security group from section A above
Enable trigger tickbox should be checked
Choose Add.
  1. Wait for the new trigger to become enabled:
Apache Kafka: Endpoints: [ip-10-0-1-199.ap-southeast-1.compute.internal:9092] (Enabled)

Testing The AWS Lambda Function

  1. Send Messages to Kafka topic "topic-names" - Producer
cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-names

then type a few messages (hit enter) into the console to send to the server (e.g., Jun King)

  1. In the AWS Lambda function, select the Monitoring tab then choose View logs in CloudWatch. In the latest log stream, you should see the original event with the following logs:

Note: Make sure to wait for a while as this is an event-driven flow and it take some time for the Spring Boot to initialize.

2021-03-02 12:00:45.912  INFO 8 --- [           main] o.s.c.f.a.aws.CustomRuntimeEventLoop     : Located function greeter
2021-03-02 12:00:45.918  INFO 8 --- [           main] c.j.s.s.SpringcloudfuncApplication       : Hello there {
    "eventSource": "SelfManagedKafka",
    "bootstrapServers": "ip-10-0-1-199.ap-southeast-1.compute.internal:9092",
    "records": {
        "topic-names-0": [
            {
                "topic": "topic-names",
                "partition": 0,
                "offset": 13,
                "timestamp": 1614686444377,
                "timestampType": "CREATE_TIME",
                "value": "SnVuIEtpbmc="
            }
        ]
    }
}

One thing to notice from above response is that we received a Hello there + JSON payload with no "Jun King" as part of it. Well, one thing to notice is the following key-value pair under the topic-names-0 array:

"value": "SnVuIEtpbmc="

One can use this Online Base64Decoder and enter the value "SnVuIEtpbmc=" then click Decode - one will get our expected String message "Jun King". I have posted a question on Stack Overflow with an initial failure on the AWS Lambda trigger but has a follow-up question on the possible approach on this payload as we can simply decode this via our application code but that would mean we will have to traverse this and make our code AWS infra-aware.

In any case, doing another producer message will trigger our newly created AWS Lambda function thus, we have successfully created an event-triggered AWS Lambda function running a Spring Boot application and listening to a self-managed Apache Kafka cluster.

Powered By

Contact me at junbetterway

Happy coding!!!

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages