Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Kafka] Support multi-table source read #5992

Merged
merged 97 commits into from
May 22, 2024

Conversation

zhilinli123
Copy link
Contributor

@zhilinli123 zhilinli123 commented Dec 11, 2023

Purpose of this pull request

Support for kafka multiple data sources

Does this PR introduce any user-facing change?

The same as before

How was this patch tested?

e2e Adds multi-source tests including TOPIC Regex matching

Check list

@zhilinli123
Copy link
Contributor Author

issues: #5677

# Conflicts:
#	seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@zhilinli123
Copy link
Contributor Author

PTAL: @hailin0 @Hisoka-X

@Hisoka-X
Copy link
Member

please resolve conflict

# Conflicts:
#	seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
#	seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@zhilinli123
Copy link
Contributor Author

PTAL: @Hisoka-X @hailin0

Comment on lines 86 to 91
public DebeziumJsonDeserializationSchema(
SeaTunnelRowType rowType,
boolean ignoreParseErrors,
boolean debeziumEnabledSchema,
CatalogTable catalogTable) {
this.rowType = rowType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public DebeziumJsonDeserializationSchema(
SeaTunnelRowType rowType,
boolean ignoreParseErrors,
boolean debeziumEnabledSchema,
CatalogTable catalogTable) {
this.rowType = rowType;
public DebeziumJsonDeserializationSchema(
CatalogTable catalogTable,
boolean ignoreParseErrors,
boolean debeziumEnabledSchema) {
this.rowType = catalogTable.getSeaTunnelRowType();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update all places

@hailin0 hailin0 modified the milestones: 2.4.0, 2.3.6 May 9, 2024
…feature-multi-table-kafka

# Conflicts:
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
#	seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java

public AvroDeserializationSchema(SeaTunnelRowType rowType) {
public AvroDeserializationSchema(SeaTunnelRowType rowType, CatalogTable catalogTable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public AvroDeserializationSchema(SeaTunnelRowType rowType, CatalogTable catalogTable) {
public AvroDeserializationSchema(CatalogTable catalogTable) {

@zhilinli123 zhilinli123 requested a review from hailin0 May 15, 2024 08:49
Copy link
Contributor

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@EricJoy2048 EricJoy2048 merged commit 6010460 into apache:dev May 22, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants