Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] 1-1 mapping between paimon buckets and kafka partitions #3249

Open
1 of 2 tasks
polyzos opened this issue Apr 23, 2024 · 2 comments
Open
1 of 2 tasks

[Feature] 1-1 mapping between paimon buckets and kafka partitions #3249

polyzos opened this issue Apr 23, 2024 · 2 comments
Labels
enhancement New feature or request

Comments

@polyzos
Copy link

polyzos commented Apr 23, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

When creating an Append for Message Queue table, as depicted in the screenshot here:
Screenshot 2024-04-23 at 8 33 05 AM

we can notice the following:

  1. 5 buckets are specified, but unless data comes in the bucket is not created
  2. If you check the Kafka partitions; partition 3 has keys 2, 3 and 4
  3. These keys though end up in different buckets
  4. Paimon does a shuffle, even though the parallelism is the same because it doesn't do 1-1 mapping

Because it is a Kafka-like message queue functionality, some users are confused, as they expect the same partitioning to happen and overall have a 1-1 mapping, between a Kafka partition and a paimon bucket.

At the same time, I believe this is a really good enhancement and should also allow to remove the shuffle between the operators, thus improving performance.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@polyzos polyzos added the enhancement New feature or request label Apr 23, 2024
@polyzos polyzos changed the title [Feature] [Feature] Have a 1-1 mapping between paimon buckets and kafka partitions Apr 23, 2024
@polyzos polyzos changed the title [Feature] Have a 1-1 mapping between paimon buckets and kafka partitions [Feature] 1-1 mapping between paimon buckets and kafka partitions Apr 23, 2024
@eric666666
Copy link
Contributor

If you want 1->1 mapping,Paimon's bucket number should bigger than kafka partitions, and their should be shuffle by kafka partition id.
I think paimon already can implement your thoughts.
Here is a demo, you can define ddl like this
Kafka source table:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id`  int METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

Paimon sink table:

CREATE table if not exists sink_paimon_table
        WITH ('connector' = 'paimon',
        'bucket' = '3',  -- bucket number should bigger than kafka partitions 
        'bucket-key' = 'partition_id',     -- bucket key must be kafka partition_id
        'merge-engine' = 'deduplicate',
        'primary-key' = 'partition_id,offset' -- parimary key must be partition_id,offset
         )
        LIKE KafkaTable (EXCLUDING ALL)

So kafka source table‘s data insert into paimon table will shuffle by kafka partition_id,partion_id is a int data type which hashcode equal itself, This pipeline model will let kafka partition record 1->1 to paimon bucket.

@polyzos
Copy link
Author

polyzos commented May 7, 2024

@eric666666 Thanks a lot for this.
The problem here though is that I'm trying to use an Append for Message Queue table, so I can offload logs from Kafka, but your example suggests using a Primary Key table.

I also try using your example, but when creating the sink_paimon_table (paimon 0.7.0-incubating), Im getting

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Table column [id, a, b, dt] should include all primary key constraint [partition_id, offset]

Let me know if I'm missing something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants