You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am developing a project using Hudi version 0.14.1 and I have developed my custom payload class. I have a problem using Merge On Read because the data is not consistent. If I use Copy On Write, it works well but with Merge On Read hudi calls the method getInsertValue before calling combineAndGetUpdateValue when the row exists. It is a problem for me because in the method getInsertValue I replace the values @#=BDP_N=#@ by null (because when the row does not exist, this value is null). Then, in the method combineAndGetUpdateValue I use the row before and I replace the columns modified. If the value of a column is null, I get the value before.
I have the following row:
key -> 1
sort -> 2
field1 -> value1
field2 -> value2
partition -> 2
_hoodie_is_deleted -> false
2) I update the row. The custom payload replaces the values @#=BDP_N=#@ by null and the columns not modified keep the value before
spark.createDataFrame(Seq(("1",3L,"@#=BDP_N=#@","value3","2",false))).toDF("key", "sort","field1","field2","partition","_hoodie_is_deleted").withColumn("field1", col("field1").cast("String")).
write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, "pruebasc").
option("hoodie.datasource.write.payload.class","CustomOverwriteWithLatestAvroPayload").
option("hoodie.avro.schema.validate","false").
option("hoodie.datasource.write.recordkey.field","key").
option("hoodie.datasource.write.precombine.field","sort").
option("hoodie.datasource.write.new.columns.nullable", "true"). //Las columnas nuevas insertadas las pone a null
option("hoodie.datasource.write.reconcile.schema","true"). //coge el esquema de lo insertado y no del nuevo registro. Importante que todo sea nulo menos la PK
option("hoodie.metadata.enable","false").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
mode(Append).
save("/tmp/pruebasc")
I have the following row:
key -> 1
sort -> 3 field1 -> value1
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
But the correct row must be:
key -> 1
sort -> 3 field1 -> null
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
I have checked my log and the problem is that using Merge On Read, hudi calls the method getInsertValue when the key exists before calling combineAndGetUpdateValue
Regards
The text was updated successfully, but these errors were encountered:
There is no guarantee that #getInsertValue should be invoked after #combineAndGetUpdateValue, actually we need the value returned by #getInsertValue to hand it over to #combineAndGetUpdateValue.
And why does it only happen with Merge On Read? Also, I have tested the version 1.0.0-beta and it doesn't happen (it works well but we can't use a beta version in production and it works worse than 0 versions). With copy or write and with the version 1.0.0-beta with updates only call the method combineAndGetUpdateValue.
I don't understand why we need to call a function to insert data before updating because then both methods are not isolated.
My guess is we use the #getInsertValue to deserialize the payload from logs. Maybe I lost some reminisenses and if you already tests it locally you can trace the invocation chain.
Hello.
I am developing a project using Hudi version 0.14.1 and I have developed my custom payload class. I have a problem using Merge On Read because the data is not consistent. If I use Copy On Write, it works well but with Merge On Read hudi calls the method getInsertValue before calling combineAndGetUpdateValue when the row exists. It is a problem for me because in the method getInsertValue I replace the values @#=BDP_N=#@ by null (because when the row does not exist, this value is null). Then, in the method combineAndGetUpdateValue I use the row before and I replace the columns modified. If the value of a column is null, I get the value before.
I am going to explain better with an example:
1) Insert a first row
spark.createDataFrame(Seq(("1",2L,"value1","value2","2",false))).toDF("key", "sort","field1","field2","partition","_hoodie_is_deleted").
write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, "pruebasc").
option("hoodie.datasource.write.payload.class","CustomOverwriteWithLatestAvroPayload").
option("hoodie.avro.schema.validate","false").
option("hoodie.datasource.write.recordkey.field","key").
option("hoodie.datasource.write.precombine.field","sort").
option("hoodie.datasource.write.new.columns.nullable", "true").
option("hoodie.datasource.write.reconcile.schema","true").
option("hoodie.metadata.enable","false").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
mode(Overwrite).
save("/tmp/pruebasc")
I have the following row:
key -> 1
sort -> 2
field1 -> value1
field2 -> value2
partition -> 2
_hoodie_is_deleted -> false
2) I update the row. The custom payload replaces the values @#=BDP_N=#@ by null and the columns not modified keep the value before
spark.createDataFrame(Seq(("1",3L,"@#=BDP_N=#@","value3","2",false))).toDF("key", "sort","field1","field2","partition","_hoodie_is_deleted").withColumn("field1", col("field1").cast("String")).
write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, "pruebasc").
option("hoodie.datasource.write.payload.class","CustomOverwriteWithLatestAvroPayload").
option("hoodie.avro.schema.validate","false").
option("hoodie.datasource.write.recordkey.field","key").
option("hoodie.datasource.write.precombine.field","sort").
option("hoodie.datasource.write.new.columns.nullable", "true"). //Las columnas nuevas insertadas las pone a null
option("hoodie.datasource.write.reconcile.schema","true"). //coge el esquema de lo insertado y no del nuevo registro. Importante que todo sea nulo menos la PK
option("hoodie.metadata.enable","false").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
mode(Append).
save("/tmp/pruebasc")
I have the following row:
key -> 1
sort -> 3
field1 -> value1
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
But the correct row must be:
key -> 1
sort -> 3
field1 -> null
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
I have checked my log and the problem is that using Merge On Read, hudi calls the method getInsertValue when the key exists before calling combineAndGetUpdateValue
Regards
The text was updated successfully, but these errors were encountered: