Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](merge-cloud) fix single rowset did not trigger compaction in cloud mode #34622

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 7 additions & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
//size_t promotion_size = tablet->cumulative_promotion_size();
size_t promotion_size = cloud_promotion_size(tablet);
auto max_version = tablet->max_version().first;
int transient_size = 0;
*compaction_score = 0;
Expand Down Expand Up @@ -93,6 +93,10 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
}

if (total_size >= promotion_size) {
return transient_size;
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
if (input_rowsets->size() == 1) {
Expand Down Expand Up @@ -154,9 +158,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
*compaction_score = new_compaction_score;

VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
<< *compaction_score << ", total_size = "
<< total_size
//<< ", calc promotion size value = " << promotion_size
<< *compaction_score << ", total_size = " << total_size
<< ", calc promotion size value = " << promotion_size
<< ", tablet = " << tablet->tablet_id() << ", input_rowset size "
<< input_rowsets->size();

Expand Down
74 changes: 42 additions & 32 deletions regression-test/suites/compaction/test_base_compaction.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ suite("test_base_compaction") {
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
Expand All @@ -64,7 +56,8 @@ suite("test_base_compaction") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
"replication_num" = "1",
"disable_auto_compaction" = "true"
)

"""
Expand All @@ -84,7 +77,42 @@ suite("test_base_compaction") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -114,13 +142,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -154,7 +176,7 @@ suite("test_base_compaction") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -182,13 +204,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -219,13 +235,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,12 @@ suite("test_base_compaction_no_value") {

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
Expand All @@ -64,7 +56,8 @@ suite("test_base_compaction_no_value") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
"replication_num" = "1",
"disable_auto_compaction" = "true"
)

"""
Expand All @@ -84,7 +77,42 @@ suite("test_base_compaction_no_value") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -114,13 +142,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -154,7 +176,7 @@ suite("test_base_compaction_no_value") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -182,13 +204,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -219,13 +235,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down