New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): add DynamoDB sink #16670
base: main
Are you sure you want to change the base?
Conversation
src/connector/src/sink/dynamodb.rs
Outdated
| ScalarRefImpl::Struct(_) | ||
| ScalarRefImpl::Jsonb(_)) => AttributeValue::S(string.to_text()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I'm not sure, the struct here must be named, since in
create sink
? - Does this mean providing an
TimestampHandlingMode
-like option forjsonb
format to DynamoDB sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can consider struct as a list of named key values pairs, right?
struct <
id varchar,
name varchar
>
Map<String, Value> {
"id": "123",
"name": "jinser"
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think the current implementation is like this
src/connector/src/sink/dynamodb.rs
Outdated
let Some(scalar_ref) = scalar_ref else { | ||
return Ok(AttributeValue::Null(true)); | ||
}; | ||
let attr = match (data_type, scalar_ref) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that matching the data_type
only could make code a bit cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that the DataType
is different from the ScalarRefImpl
variant, and just matching data_type
would mean requiring scalar_ref.into_foo()
(which could panic in some case). Do you mean that this function map_data_type
is only used here and does not need to consider possible the tow varant different situations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think each DataType
must have only one possible corresponding ScalarRefImpl
, so no need to match ScalarRefImpl
.
async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { | ||
for (op, row) in chunk.rows() { | ||
match op { | ||
Op::Insert | Op::UpdateInsert => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you ever tested an update
event?
In self.payload_writer.write_chunk()
, you inserted all the events before delete, which effectively means the UpdateInsert
will happen before UpdateDelete
. IIUC, the rows will be deleted rather than being updated.
Here, I prefer to handle UpdateInsert
and UpdateDelete
as a whole i.e. convert them into a single put KV
into DynamoDB. There is no reason not doing so.
But keep in mind that this won't really solve the problem. Please carefully consider both of the following case:
- An
Insert
comes afterDelete
on the same key - An
Delete
comes afterInsert
on the same key
Perhaps the current implementation of self.payload_writer.write_chunk()
won't work.
#[serde(rename = "aws.region")] | ||
pub stream_region: String, | ||
#[serde(rename = "aws.endpoint")] | ||
pub endpoint: Option<String>, | ||
#[serde(rename = "aws.credentials.access_key_id")] | ||
pub credentials_access_key: Option<String>, | ||
#[serde(rename = "aws.credentials.secret_access_key")] | ||
pub credentials_secret_access_key: Option<String>, | ||
#[serde(rename = "aws.credentials.session_token")] | ||
pub session_token: Option<String>, | ||
#[serde(rename = "aws.credentials.role.arn")] | ||
pub assume_role_arn: Option<String>, | ||
#[serde(rename = "aws.credentials.role.external_id")] | ||
pub assume_role_external_id: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just put AwsAuthProps here.
refactor dynamodb sink to use batch_write_item
DynamoDB can be deployed locally via docker image: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html |
Should I add it to this PR? Or should I open another PR? |
@xiangjinwu @fuyufjh @yuhao-su I've made the changes, please take a review when you have time 😃 |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
1:1 mapping DynamoDB table sink.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
new sink: