Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] StarRocks Source support multiple table #6784

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 84 additions & 16 deletions docs/en/connector-v2/source/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| table_list | array | yes | - |
| scan_filter | string | no | - |
| schema | config | yes | - |
| request_tablet_size | int | no | Integer.MAX_VALUE |
Expand Down Expand Up @@ -57,6 +58,10 @@ The name of StarRocks database

The name of StarRocks table

### table_list [array]

The list of tables to be read, you can use this configuration instead of `table`

### scan_filter [string]

Filter expression of the query, which is transparently transmitted to StarRocks. StarRocks uses this expression to complete source-side data filtering.
Expand All @@ -76,7 +81,7 @@ The schema of the starRocks that you want to generate
e.g.

```
schema {
schema = {
fields {
name = string
age = int
Expand Down Expand Up @@ -153,28 +158,91 @@ source {
table = "e2e_table_source"
scan_batch_rows = 10
max_retries = 3
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
schema = {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
scan.params.scanner_thread_pool_thread_num = "3"

}
}
```

## Example 2: Multiple tables

```
source {
StarRocks {
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table_list = [
{
table = "e2e_table_source"
schema = {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
},
{
table = "e2e_table_source_2"
schema = {
fields {
BIGINT_COL_2 = BIGINT
LARGEINT_COL_2 = STRING
SMALLINT_COL_2 = SMALLINT
TINYINT_COL_2 = TINYINT
BOOLEAN_COL_2 = BOOLEAN
DECIMAL_COL_2 = "DECIMAL(20, 1)"
DOUBLE_COL_2 = DOUBLE
FLOAT_COL_2 = FLOAT
INT_COL_2 = INT
CHAR_COL_2 = STRING
VARCHAR_11_COL_2 = STRING
STRING_COL_2 = STRING
DATETIME_COL_2 = TIMESTAMP
DATE_COL_2 = DATE
}
}
}]
scan_batch_rows = 10
max_retries = 3
scan.params.scanner_thread_pool_thread_num = "3"

}
}

```

## Changelog

### next version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FILE_OPERATION_FAILED("COMMON-01", "<identifier> <operation> file '<fileName>' failed."),
JSON_OPERATION_FAILED(
"COMMON-02", "<identifier> JSON convert/parse '<payload>' operation failed."),
UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"),
ILLEGAL_ARGUMENT("COMMON-06", "Illegal argument"),
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of '<field>'"),
UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding '<encoding>'"),
WRITER_OPERATION_FAILED(
"COMMON-11", "Sink writer operation failed, such as (open, close) etc..."),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
"COMMON-16",
"'<connector>' <type> unsupported convert type '<dataType>' of '<field>' to SeaTunnel data type."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
Expand Down Expand Up @@ -68,7 +68,7 @@ public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
String host = getAvailableHost();
if (null == host) {
throw new StarRocksConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
CommonErrorCode.ILLEGAL_ARGUMENT,
"None of the host in `load_url` could be connected.");
}
String loadUrl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
}

public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
eos.set(false);
this.readerOffset = 0;
this.rowBatch = null;
this.seaTunnelRowType = seaTunnelRowType;
Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
params.setTablet_ids(new ArrayList<>(tabletIds));
params.setOpaqued_query_plan(partition.getQueryPlan());
params.setCluster(DEFAULT_CLUSTER_NAME);
params.setDatabase(sourceConfig.getDatabase());
params.setTable(sourceConfig.getTable());
params.setDatabase(partition.getDatabase());
params.setTable(partition.getTable());
params.setUser(sourceConfig.getUsername());
params.setPasswd(sourceConfig.getPassword());
params.setBatch_size(sourceConfig.getBatchRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

Expand All @@ -39,44 +40,42 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class StarRocksQueryPlanReadClient {
private RetryUtils.RetryMaterial retryMaterial;
private SourceConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;
private final HttpHelper httpHelper = new HttpHelper();
private final Map<String, StarRocksSourceTableConfig> tables;

private static final long DEFAULT_SLEEP_TIME_MS = 1000L;

public StarRocksQueryPlanReadClient(
SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) {
public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.seaTunnelRowType = seaTunnelRowType;
this.retryMaterial =
new RetryUtils.RetryMaterial(
sourceConfig.getMaxRetries(),
true,
exception -> true,
DEFAULT_SLEEP_TIME_MS);
this.tables =
sourceConfig.getTableConfigList().stream()
.collect(
Collectors.toMap(
StarRocksSourceTableConfig::getTable, Function.identity()));
}

public List<QueryPartition> findPartitions() {
List<String> nodeUrls = sourceConfig.getNodeUrls();
QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
public List<QueryPartition> findPartitions(String table) {
QueryPlan queryPlan = getQueryPlan(genQuerySql(table), table);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
queryPlan.getQueryPlan(),
sourceConfig.getDatabase(),
sourceConfig.getTable());
return tabletsMapToPartition(be2Tablets, queryPlan.getQueryPlan(), table);
}

private List<QueryPartition> tabletsMapToPartition(
Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan,
String database,
String table)
Map<String, List<Long>> be2Tablets, String opaquedQueryPlan, String table)
throws IllegalArgumentException {
int tabletsSize = sourceConfig.getRequestTabletSize();
List<QueryPartition> partitions = new ArrayList<>();
Expand All @@ -98,7 +97,7 @@ private List<QueryPartition> tabletsMapToPartition(
first = first + tabletsSize;
QueryPartition partitionDefinition =
new QueryPartition(
database,
sourceConfig.getDatabase(),
table,
beInfo.getKey(),
partitionTablets,
Expand Down Expand Up @@ -134,8 +133,9 @@ private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
return beXTablets;
}

private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
private QueryPlan getQueryPlan(String querySQL, String table) {

List<String> nodeUrls = sourceConfig.getNodeUrls();
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("sql", querySQL);
String body = JsonUtils.toJsonString(bodyMap);
Expand All @@ -147,7 +147,7 @@ private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append(table)
.append("/_query_plan")
.toString();
try {
Expand Down Expand Up @@ -183,15 +183,17 @@ private Map<String, String> getQueryPlanHttpHeader() {
return headerMap;
}

private String genQuerySql() {
private String genQuerySql(String table) {

StarRocksSourceTableConfig starRocksSourceTableConfig = tables.get(table);
SeaTunnelRowType seaTunnelRowType =
starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType();
String columns =
seaTunnelRowType.getFieldNames().length != 0
? String.join(",", seaTunnelRowType.getFieldNames())
: "*";
String filter =
sourceConfig.getScanFilter().isEmpty()
? ""
: " where " + sourceConfig.getScanFilter();
String scanFilter = starRocksSourceTableConfig.getScanFilter();
String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter;

String sql =
"select "
Expand All @@ -202,7 +204,7 @@ private String genQuerySql() {
+ "`"
+ "."
+ "`"
+ sourceConfig.getTable()
+ table
+ "`"
+ filter;
log.debug("Generate query sql '{}'.", sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public int compareTo(QueryPartition o) {
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
if (diffSelf.size() == 0) {
if (diffSelf.isEmpty()) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);
Expand Down Expand Up @@ -103,7 +103,7 @@ public int hashCode() {
@Override
public String toString() {
return "QueryPartition{"
+ ", database='"
+ "database='"
+ database
+ '\''
+ ", table='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ public class CommonConfig implements Serializable {
private String username;
private String password;
private String database;
private String table;

public CommonConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(NODE_URLS);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.database = config.get(DATABASE);
this.table = config.get(TABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Setter
Expand Down Expand Up @@ -53,6 +57,7 @@ public SourceConfig(ReadonlyConfig config) {
key.substring(prefix.length()).toLowerCase(), value);
}
});
tableConfigList = StarRocksSourceTableConfig.of(config);
}

public static final Option<Integer> MAX_RETRIES =
Expand Down Expand Up @@ -106,6 +111,12 @@ public SourceConfig(ReadonlyConfig config) {
.noDefaultValue()
.withDescription("The parameter of the scan data from be");

public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("table list config");

private int maxRetries = MAX_RETRIES.defaultValue();
private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = SCAN_FILTER.defaultValue();
Expand All @@ -115,4 +126,5 @@ public SourceConfig(ReadonlyConfig config) {
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
private Map<String, String> sourceOptionProps = new HashMap<>();
private List<StarRocksSourceTableConfig> tableConfigList = new ArrayList<>();
}