Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connector-JDBC] Table structure synchronization #6799

Open
2 of 3 tasks
Ivan-gfan opened this issue May 6, 2024 · 1 comment
Open
2 of 3 tasks

[Bug] [Connector-JDBC] Table structure synchronization #6799

Ivan-gfan opened this issue May 6, 2024 · 1 comment
Labels

Comments

@Ivan-gfan
Copy link
Contributor

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I synchronized from MySQL to MySQL, fieldMapper of transform was added, and when schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"was configured at the sink, an error occurred in the automatic table creation function, and all field types were null.

The following DDL statements are viewed in the log

2024-05-06 18:48:28,434 INFO  org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table awakening_earth_web.awakening_earth_web.testCreate with action CREATE TABLE `testCreate` (
	`id` null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名称', 
	`plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
	`type` null NOT NULL COMMENT '数据源类型', 
	`configuration` null NOT NULL COMMENT '数据源连接配置', 
	`description` null NULL COMMENT '数据源描述', 
	`icon` null NULL COMMENT '图标', 
	PRIMARY KEY (`id`)
) COMMENT = '';
2024-05-06 18:48:29,499 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Execute sql : CREATE TABLE `testCreate` (
	`id` null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名称', 
	`plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
	`type` null NOT NULL COMMENT '数据源类型', 
	`configuration` null NOT NULL COMMENT '数据源连接配置', 
	`description` null NULL COMMENT '数据源描述', 
	`icon` null NULL COMMENT '图标', 
	PRIMARY KEY (`id`)
) COMMENT = '';

I traced the code and found that there was a judgment in the MysqlCreateTableSqlBuilder class to determine if the previous catalogname was the current mysql, and if so, get the column.getSourceType, but the sourceType attribute is null, which causes this error

private String buildColumnIdentifySql(Column column, String catalogName) {
        final List<String> columnSqls = new ArrayList<>();
        columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
        boolean isSupportDef = true;

        if ((SqlType.TIME.equals(column.getDataType().getSqlType())
                        || SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
                && column.getScale() != null) {
            BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
            columnSqls.add(typeDefine.getColumnType());
        } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
            columnSqls.add(column.getSourceType());
        } else {
            BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
            columnSqls.add(typeDefine.getColumnType());
        }
        // nullable
        if (column.isNullable()) {
            columnSqls.add("NULL");
        } else {
            columnSqls.add("NOT NULL");
        }

        if (column.getComment() != null) {
            columnSqls.add("COMMENT '" + column.getComment() + "'");
        }

        return String.join(" ", columnSqls);
    }

I tracked the code further and found that when sink was built(Class MultipleTableJobConfigParser#parseSink), there was a tableWithActionMap parameter, which stored the field structure of the source side and the field structure of transform, and the sourceType of the field structure of transform was null

ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
        String factoryId = getFactoryId(readonlyConfig);
        List<String> inputIds = getInputIds(readonlyConfig);

        List<List<Tuple2<CatalogTable, Action>>> inputVertices =
                inputIds.stream()
                        .map(tableWithActionMap::get)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList());
        if (inputVertices.isEmpty()) {
            // Tolerates incorrect configuration of simple graph
            inputVertices = Collections.singletonList(findLast(tableWithActionMap));
        } else if (inputVertices.size() > 1) {
            for (List<Tuple2<CatalogTable, Action>> inputVertex : inputVertices) {
                if (inputVertex.size() > 1) {
                    throw new JobDefineCheckException(
                            "Sink don't support simultaneous writing of data from multi-table source and other sources.");
                }
            }
        }

It is not clear for the time being whether such a result should be obtained. Now I have only tracked it here, please help to see where the problem occurred

From this point of view, this error should not occur only from mysql to mysql, but if the source side and the target side are of the same type, and there is a transform fieldmapper, the same error will occur

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "root"
        database = "test"
        table_list = [
            {
                table_path = "test.source"
            }
        ]
        result_table_name = "test1"
    }
}

transform {
  FieldMapper {
    source_table_name = "test1"
    result_table_name = "test2"
    field_mapper = {
        id = id
        name = name
        plugin_name = plugin_name_test
        type = type
        configuration = configuration
        description = description
        icon = icon
    }
  }
}

sink {
    jdbc {
        url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "root"
        database = "test"
        table = "test.testCreate"
        generate_sink_sql = true
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode="APPEND_DATA"
        source_table_name = "test2"
    }
}

Running Command

The cluster mode is used to run the startup class under the `example-module` in the source code

Error Exception

2024-05-06 18:48:27,337 INFO  org.apache.seatunnel.api.sink.SaveModeExecuteWrapper - Executing save mode for table: awakening_earth_web.awakening_earth_web.testCreate, with SchemaSaveMode: CREATE_SCHEMA_WHEN_NOT_EXIST, DataSaveMode: APPEND_DATA using Catalog: MySQL
2024-05-06 18:48:28,434 INFO  org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table test.awakening_earth_web.testCreate with action CREATE TABLE `testCreate` (
	`id` null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名称', 
	`plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
	`type` null NOT NULL COMMENT '数据源类型', 
	`configuration` null NOT NULL COMMENT '数据源连接配置', 
	`description` null NULL COMMENT '数据源描述', 
	`icon` null NULL COMMENT '图标', 
	PRIMARY KEY (`id`)
) COMMENT = '';
2024-05-06 18:48:29,499 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Execute sql : CREATE TABLE `testCreate` (
	`id` null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名称', 
	`plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
	`type` null NOT NULL COMMENT '数据源类型', 
	`configuration` null NOT NULL COMMENT '数据源连接配置', 
	`description` null NULL COMMENT '数据源描述', 
	`icon` null NULL COMMENT '图标', 
	PRIMARY KEY (`id`)
) COMMENT = '';
2024-05-06 18:48:29,516 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Catalog MySQL closing
2024-05-06 18:48:29,516 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-05-06 18:48:29,519 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5801:7c595412-458a-4afd-9528-f6508f4e1ad6, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:52796->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-05-06 18:48:29.504, lastWriteTime=2024-05-06 18:48:29.503, closedTime=2024-05-06 18:48:29.517, connected server version=5.1}
2024-05-06 18:48:29,519 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-05-06 18:48:29,556 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-05-06 18:48:29,557 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 

===============================================================================


2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 

2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 

2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45)
Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153)
	... 2 more
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table awakening_earth_web.awakening_earth_web.testCreate
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
	at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
	at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669)
	... 8 more
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名�' at line 2
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362)
	... 15 more
 
2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45)
Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153)
	... 2 more
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table awakening_earth_web.awakening_earth_web.testCreate
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
	at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
	at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
	at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669)
	... 8 more
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'null NOT NULL COMMENT '主键id', 
	`name` null NOT NULL COMMENT '数据源名�' at line 2
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362)
	... 15 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

jdk1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Ivan-gfan Ivan-gfan added the bug label May 6, 2024
@Ivan-gfan
Copy link
Contributor Author

The problem is with physicColumn.of in the FieldMapper class under the Transform module, which leaves the sourceType empty, which I have modified on the Intranet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant