Skip to content

Using DynamoDb Stream to capture item-level changes in the DynamoDb table, then use kinesis data stream and kinesis firehose to save the changes into a s3 bucket.

Notifications You must be signed in to change notification settings

gakas14/DynamoDb-Stream-Data-Capture

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 

Repository files navigation

Streaming Amazon DynamoDB data into a centralized data lake (S3)

DynamoDB Stream captures item-level changes in the DynamoDb table. Then, Kinesis Data Stream and Firehose save the changes to an S3 bucket. A lambda function transforms the data before dumping it into S3.

dynamodb_Stream

Step 1: Create a DynamoDB table.

Create a customer table with a customer_id as the primary key, Screen Shot 2024-01-27 at 2 28 39 PM

Step 2: Create a kinesis data stream

Screen Shot 2024-01-27 at 2 32 59 PM

Step 3: Create a lambda function and a s3 bucket

Screen Shot 2024-01-27 at 2 40 26 PM

Lambda function script

          # This function adds a new line in between each record coming from Kenesis data stream 
          import json
          import boto3
          import base64
          output = []
          
          def lambda_handler(event, context):
              print(event)
              for record in event['records']:
                  payload = base64.b64decode(record['data']).decode('utf-8')
                  print('payload:', payload)
                  
                  row_w_newline = payload + "\n"
                  print('row_w_newline type:', type(row_w_newline))
                  row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
                  
                  output_record = {
                      'recordId': record['recordId'],
                      'result': 'Ok',
                      'data': row_w_newline
                  }
                  output.append(output_record)
          
              print('Processed {} records.'.format(len(event['records'])))
              
              return {'records': output}

Step 4: Create the kinesis firehouse

Screen Shot 2024-01-27 at 2 38 12 PM

With source as kinesis data stream and destination as s3

Add Transform source records with AWS Lambda.

Add the s3 bucket

Configure the Buffer size and interval to dump the data only if the size is one MB or in 60 seconds.

Screen Shot 2024-01-27 at 2 47 53 PM

Step 5: setup the DynamoDB stream with the kinesis

Turn on the Amazon Kinesis data stream from the DynamoDB table.
Screen Shot 2024-01-27 at 2 50 45 PM

Step 6: insert data into the table

Screen Shot 2024-01-27 at 3 08 35 PM

SQL Queriew for DynamoDB

INSERT INTO "customers" value {'customers_id':1, 'name':'Baba Li', 'age':20,'gender':'M'}
INSERT INTO "customers" value {'customers_id':2, 'name':'Lucky Bill', 'age':24,'gender':'M'}
INSERT INTO "customers" value {'customers_id':3, 'name':'Mom Ma', 'age':50,'gender':'F'}
INSERT INTO "customers" value {'customers_id':4, 'name':'Locker Su', 'age':30,'gender':'M'}
INSERT INTO "customers" value {'customers_id':5, 'name':'Abdel ly', 'age':41,'gender':'F'}
INSERT INTO "customers" value {'customers_id':6, 'name':'Abou Sar', 'age':35,'gender':'F'}

update customers set age=26 where customers_id=3

select * from customers;
Screen Shot 2024-01-27 at 3 08 42 PM

Step 7: Check the data in the s3 bucket

Screen Shot 2024-01-27 at 3 11 05 PM

About

Using DynamoDb Stream to capture item-level changes in the DynamoDb table, then use kinesis data stream and kinesis firehose to save the changes into a s3 bucket.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published