Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read #6684

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
Expand Down Expand Up @@ -177,28 +177,26 @@ private List<ChunkRange> splitTableIntoChunks(
tableId,
inverseSamplingRate);
Object[] sample =
sampleDataFromColumn(
jdbc, tableId, splitColumnName, inverseSamplingRate);
sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
log.info(
"Sample data from table {} end, the sample size is {}",
tableId,
sample.length);
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}

/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
protected List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
Expand All @@ -207,15 +205,15 @@ protected List<ChunkRange> splitUnevenlySizedChunks(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
Expand All @@ -226,17 +224,17 @@ protected Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectCompare(chunkEnd, max) >= 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.sql.SQLException;
Expand All @@ -35,16 +36,29 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@Override
Collection<SnapshotSplit> generateSplits(TableId tableId);

/** @deprecated instead by {@link this#queryMinMax(JdbcConnection, TableId, Column)} */
@Deprecated
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException;

/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @return maximum and minimum value.
*/
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
default Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return queryMinMax(jdbc, tableId, column.name());
}

/** @deprecated instead by {@link this#queryMin(JdbcConnection, TableId, Column, Object)} */
@Deprecated
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException;

/**
Expand All @@ -54,12 +68,19 @@ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
default Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}

@Deprecated
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
throws SQLException;

/**
Expand All @@ -68,14 +89,29 @@ Object queryMin(
*
* @param jdbc The JDBC connection object used to connect to the database.
* @param tableId The ID of the table in which the column resides.
* @param columnName The name of the column to be sampled.
* @param column The column to be sampled.
* @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from
* the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled.
* @return Returns a List of sampled data from the specified column.
* @throws SQLException If an SQL error occurs during the sampling operation.
*/
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
default Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int samplingRate)
throws SQLException {
return sampleDataFromColumn(jdbc, tableId, column.name(), samplingRate);
}

/**
* @deprecated instead by {@link this#queryNextChunkMax(JdbcConnection, TableId, Column, int,
* Object)}
*/
@Deprecated
Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
int chunkSize,
Object includedLowerBound)
throws SQLException;

/**
Expand All @@ -85,18 +121,20 @@ Object[] sampleDataFromColumn(
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
Object queryNextChunkMax(
default Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException;
throws SQLException {
return queryNextChunkMax(jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}

/**
* Approximate total number of entries in the lookup table.
Expand All @@ -110,17 +148,14 @@ Object queryNextChunkMax(
/**
* Build the scan query sql of the {@link SnapshotSplit}.
*
* @param tableId table identity.
* @param table table.
* @param splitKeyType primary key type.
* @param isFirstSplit whether the first split.
* @param isLastSplit whether the last split.
* @return query sql.
*/
String buildSplitScanQuery(
TableId tableId,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit);
Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit);

/**
* Checks whether split column is evenly distributed across its range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)

@Override
public String buildSplitScanQuery(
TableId tableId,
Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.sql.SQLException;
Expand Down Expand Up @@ -217,7 +218,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)

@Override
public String buildSplitScanQuery(
TableId tableId,
Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -79,11 +80,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws

@Override
public String buildSplitScanQuery(
TableId tableId,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return MySqlUtils.buildSplitScanQuery(table.id(), splitKeyType, isFirstSplit, isLastSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import oracle.sql.ROWID;
Expand Down Expand Up @@ -84,11 +85,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws

@Override
public String buildSplitScanQuery(
TableId tableId,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return OracleUtils.buildSplitScanQuery(table.id(), splitKeyType, isFirstSplit, isLastSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -43,22 +44,43 @@ public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialec
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
return PostgresUtils.queryMinMax(jdbc, tableId, columnName);
return PostgresUtils.queryMinMax(jdbc, tableId, columnName, null);
}

@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return PostgresUtils.queryMinMax(jdbc, tableId, column.name(), column);
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
return PostgresUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return PostgresUtils.queryMin(jdbc, tableId, columnName, null, excludedLowerBound);
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return PostgresUtils.queryMin(jdbc, tableId, column.name(), column, excludedLowerBound);
}

@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
jdbc, tableId, columnName, null, inverseSamplingRate);
}

@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int inverseSamplingRate)
throws SQLException {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, column.name(), column, inverseSamplingRate);
}

@Override
Expand All @@ -70,7 +92,19 @@ public Object queryNextChunkMax(
Object includedLowerBound)
throws SQLException {
return PostgresUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, columnName, null, chunkSize, includedLowerBound);
}

@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return PostgresUtils.queryNextChunkMax(
jdbc, tableId, column.name(), column, chunkSize, includedLowerBound);
}

@Override
Expand All @@ -80,11 +114,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws

@Override
public String buildSplitScanQuery(
TableId tableId,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
return PostgresUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return PostgresUtils.buildSplitScanQuery(table, splitKeyType, isFirstSplit, isLastSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void createDataEventsForTable(

final String selectSql =
PostgresUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
table,
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
Expand Down