Skip to content

Commit

Permalink
[fix](merge-cloud) fix single rowset did not trigger compaction in cl…
Browse files Browse the repository at this point in the history
…oud mode (apache#34622)
  • Loading branch information
luwei16 authored and M1saka2003 committed May 14, 2024
1 parent 6a61897 commit 5076c61
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 69 deletions.
11 changes: 7 additions & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
//size_t promotion_size = tablet->cumulative_promotion_size();
size_t promotion_size = cloud_promotion_size(tablet);
auto max_version = tablet->max_version().first;
int transient_size = 0;
*compaction_score = 0;
Expand Down Expand Up @@ -93,6 +93,10 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
}

if (total_size >= promotion_size) {
return transient_size;
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
if (input_rowsets->size() == 1) {
Expand Down Expand Up @@ -154,9 +158,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
*compaction_score = new_compaction_score;

VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
<< *compaction_score << ", total_size = "
<< total_size
//<< ", calc promotion size value = " << promotion_size
<< *compaction_score << ", total_size = " << total_size
<< ", calc promotion size value = " << promotion_size
<< ", tablet = " << tablet->tablet_id() << ", input_rowset size "
<< input_rowsets->size();

Expand Down
74 changes: 42 additions & 32 deletions regression-test/suites/compaction/test_base_compaction.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ suite("test_base_compaction") {
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
Expand All @@ -64,7 +56,8 @@ suite("test_base_compaction") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
"replication_num" = "1",
"disable_auto_compaction" = "true"
)
"""
Expand All @@ -84,7 +77,42 @@ suite("test_base_compaction") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -114,13 +142,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -154,7 +176,7 @@ suite("test_base_compaction") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -182,13 +204,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -219,13 +235,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,12 @@ suite("test_base_compaction_no_value") {

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
Expand All @@ -64,7 +56,8 @@ suite("test_base_compaction_no_value") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
"replication_num" = "1",
"disable_auto_compaction" = "true"
)
"""
Expand All @@ -84,7 +77,42 @@ suite("test_base_compaction_no_value") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -114,13 +142,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -154,7 +176,7 @@ suite("test_base_compaction_no_value") {

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""

time 10000 // limit inflight 10s

Expand Down Expand Up @@ -182,13 +204,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down Expand Up @@ -219,13 +235,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
Expand Down

0 comments on commit 5076c61

Please sign in to comment.