Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka source can not consumer all history data when use batch mode #6388

Open
3 tasks done
jeanleen opened this issue Feb 26, 2024 · 7 comments · May be fixed by #6685
Open
3 tasks done

kafka source can not consumer all history data when use batch mode #6388

jeanleen opened this issue Feb 26, 2024 · 7 comments · May be fixed by #6685

Comments

@jeanleen
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

kafka source can not consumer all history data when use batch mode
i have 1000w+ messages prepared to consume in kafka, when conumer data with batch mode, it ended when consumer part of data.
#2024-02-26 17:44:47,768 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, job id: 814432701363781634, job name: SeaTunnel
2024-02-26 17:44:47,776 WARN org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job metrics summary, it maybe first-run
2024-02-26 17:44:55,561 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (814432701363781634) end with state FINISHED
2024-02-26 17:44:55,569 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -


       Job Statistic Information

Start Time : 2024-02-26 17:44:47
End Time : 2024-02-26 17:44:55
Total Time(s) : 8
Total Read Count : 53847
Total Write Count : 53847
Total Failed Count : 0


2024-02-26 17:44:55,570 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-26 17:44:55,577 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, connected server version=5.1}
2024-02-26 17:44:55,580 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: ClientConnection{alive=false, connectionId=2, channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, connected server version=5.1}
2024-02-26 17:44:55,580 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-26 17:44:55,584 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2024-02-26 17:44:55,586 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

SeaTunnel Version

2..3.3

SeaTunnel Config

"env" : {
        "parallelism" : 3,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 30000,
        "job.name" : "oalu_kafka"
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "Price" : "STRING",
                    "Aplus" : "STRING",
                    "AmazonChoice" : "STRING",
                    "Brand" : "STRING"
                }
            },
            "consumer.group" : "amc",
            "commit_on_checkpoint" : true,
            "format" : "json",
            "topic" : "lurProduct",
            "bootstrap.servers" : "192.168.9.31:9092,192.168.9.81:9092",
            "plugin_name" : "Kafka",
            "kafka.config" : {
                "client.id" : "client_1",
                "max.poll.records" : "100000",
                "auto.offset.reset" : "earliest",
                "max.partition.fetch.bytes" : "52428800",
                "session.timeout.ms" : "30000"
            }
        }
    ],
    "sink" : [
        {
            "fs.defaultFS" : "hdfs://hadoop0011:8020",
            "path" : "/user/hive/warehouse/stg.db/pdc_lu_product_data/",
            "bath_size" : 10000,
            "file_format_type" : "orc",
            "plugin_name" : "HdfsFile"
        }
    ]
}

Running Command

/usr/local/apache-seatunnel-2.3.3/bin/seatunnel.sh --config pdc_lu_kafka_bath.conf

Error Exception

2024-02-26 17:44:47,539 INFO  org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-02-26 17:44:47,540 INFO  org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-02-26 17:44:47,543 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
2024-02-26 17:44:47,547 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: Kafka at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar
2024-02-26 17:44:47,547 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: HdfsFile at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar
2024-02-26 17:44:47,552 INFO  org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sources.
2024-02-26 17:44:47,554 INFO  org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-02-26 17:44:47,570 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSource Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
2024-02-26 17:44:47,574 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: Kafka at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar
2024-02-26 17:44:47,578 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Kafka'} from classpath
2024-02-26 17:44:47,598 INFO  org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all transforms.
2024-02-26 17:44:47,598 INFO  org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sinks.
2024-02-26 17:44:47,599 INFO  org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-02-26 17:44:47,601 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
2024-02-26 17:44:47,601 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: HdfsFile at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar
2024-02-26 17:44:47,603 INFO  org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='HdfsFile'} from classpath
2024-02-26 17:44:47,702 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Start submit job, job id: 814432701363781634, with plugin jar [file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/ojdbc8-19.10.0.0.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/hive/lib/hive-exec-2.3.9.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/mysql-connector-java-8.0.32.jar, file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/orai18n-19.10.0.0.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/hive-jdbc-3.1.3.jar]
2024-02-26 17:44:47,768 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, job id: 814432701363781634, job name: SeaTunnel
2024-02-26 17:44:47,776 WARN  org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job metrics summary, it maybe first-run
2024-02-26 17:44:55,561 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (814432701363781634) end with state FINISHED
2024-02-26 17:44:55,569 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-02-26 17:44:47
End Time                  : 2024-02-26 17:44:55
Total Time(s)             :                   8
Total Read Count          :               53847
Total Write Count         :               53847
Total Failed Count        :                   0
***********************************************

2024-02-26 17:44:55,570 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-26 17:44:55,577 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, connected server version=5.1}
2024-02-26 17:44:55,580 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: ClientConnection{alive=false, connectionId=2, channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, connected server version=5.1}
2024-02-26 17:44:55,580 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-26 17:44:55,584 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-26 17:44:55,584 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2024-02-26 17:44:55,584 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2024-02-26 17:44:55,586 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8
scala 2.12

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jeanleen jeanleen added the bug label Feb 26, 2024
@jeanleen
Copy link
Author

when use stream mode, it will consume data continuously ,it is ok

Copy link

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Mar 28, 2024
Copy link

github-actions bot commented Apr 4, 2024

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

@SbloodyS
Copy link
Member

I'm encounter this issue too. I've already submit pr #6685 to fix this.

@Hisoka-X Hisoka-X reopened this Apr 12, 2024
@logan2013
Copy link

I'm encounter this issue too.

@logan2013
Copy link

I'm encounter this issue too. I've already submit pr #6685 to fix this.

why this pr cann't be accepted

@Hisoka-X
Copy link
Member

Hisoka-X commented May 9, 2024

I'm encounter this issue too. I've already submit pr #6685 to fix this.

why this pr cann't be accepted

We need some test case to reproduce this bug again in this PR. To make sure same bug not happend again in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants