Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [FLINK] Delta log not updated with with the parquet files created after the last checkpoint before Flink Job completed #3057

Open
3 tasks
galadrielwithlaptop opened this issue May 6, 2024 · 4 comments
Labels
bug Something isn't working

Comments

@galadrielwithlaptop
Copy link

Bug

Which Delta project/connector is this regarding?

  • Flink - 1.17.0
  • Delta - 3.0.0

Describe the problem

I have a delta flink program similar to this. https://learn.microsoft.com/en-us/azure/hdinsight-aks/flink/use-flink-delta-connector . I enabled checkpointing to 1s and job takes around 10 chkpoints and delta_log gets updated 10 times. But after the 10th chkpoint, one parquet files gets created and i can see one more chkpoint is invoked. Then the job got finished. But somehow, the delta log file is not updated. Please check logs here.

_2024-05-06 21:04:17.884 [] Sink: Global Committer (1/1)#0 INFO  flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=34e8a131,txnId=cac93905] Committed delta #9 to abfs://appmode@xy.dfs.core.windows.net/delta-session3/_delta_log
2024-05-06 21:04:17.884 [] Sink: Global Committer (1/1)#0 INFO  flink flink.sink.internal.committer.DeltaGlobalCommitter 450 Successfully committed transaction (appId='45470aec-412d-4f74-87b0-22aa50a8f468', checkpointId='10')
2024-05-06 21:04:17.886 [] Sink: Global Committer (1/1)#0 INFO  flink apache.flink.runtime.taskmanager.Task 1084 Sink: Global Committer (1/1)#0 (6e0af40e48232d9d6a2de4a6a20d497b_306d8342cb5b2ad8b53f1be57f65bee8_0_0) switched from RUNNING to FINISHED._

I mean if there is some config , I am missing.

ABFS storage

image

image

  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
@galadrielwithlaptop galadrielwithlaptop added the bug Something isn't working label May 6, 2024
@scottsand-db
Copy link
Collaborator

Hi @galadrielwithlaptop - is there a simple coding example you can create that can reproduce this issue?

@galadrielwithlaptop
Copy link
Author

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

@galadrielwithlaptop
Copy link
Author

galadrielwithlaptop commented May 14, 2024

You can keep data of few data points only (say 100). and a checkpoint interval of lets just say 5 secs.

I am pretty sure that TMs are freed before DeltaGlobalCommiter could complete.

@galadrielwithlaptop
Copy link
Author

TM logs

2024-05-09 15:46:34.259 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaCommitter 103 Committing delta committable locally: appId=2ee0131e-257c-4466-8523-3b3df0333e88 checkpointId=13 deltaPendingFile=DeltaPendingFile(fileName=part-b7a731e2-610c-4035-b35d-ee5052c3cae7-12.snappy.parquet lastUpdateTime=1715269592562 fileSize=147467 recordCount=115818 partitionSpec={})
2024-05-09 15:46:34.291 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Loading version 10 starting from checkpoint version 10.
2024-05-09 15:46:34.291 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 [tableId=28bbe224-a5af-413d-ab4c-edee66f9d2b7] Created snapshot io.delta.standalone.internal.SnapshotImpl@1c9609eb
2024-05-09 15:46:34.332 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink connector.base.source.reader.SourceReaderBase 265 Closing Source Reader.
2024-05-09 15:46:34.336 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 1084 Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FINISHED.
2024-05-09 15:46:34.336 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 829 Freeing task resources for Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0).
2024-05-09 15:46:34.336 [] flink-akka.actor.default-dispatcher-17 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1970 Un-registering task and sending final execution state FINISHED to JobManager for task Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2024-05-09 15:46:34.415 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 190 RecordReader initialized will read a total of 14 records.
2024-05-09 15:46:34.415 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 124 at row 0. reading next block
2024-05-09 15:46:34.435 [] Sink: Global Committer (1/1)#0 INFO flink apache.hadoop.io.compress.CodecPool 184 Got brand-new decompressor [.snappy]
2024-05-09 15:46:34.437 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 133 block read in memory in 22 ms. row count = 14
2024-05-09 15:46:34.461 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the protocol from the protocolMetadataHint: Protocol(1,2)
2024-05-09 15:46:34.461 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the metadata from the protocolMetadataHint: Metadata(28bbe224-a5af-413d-ab4c-edee66f9d2b7,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"f0","type":"string","nullable":true,"metadata":{}},{"name":"f1","type":"integer","nullable":true,"metadata":{}},{"name":"f2","type":"boolean","nullable":true,"metadata":{}}]},List(),Map(),Some(1715269582326))
2024-05-09 15:46:34.462 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Updated snapshot to io.delta.standalone.internal.SnapshotImpl@1c9609eb
2024-05-09 15:46:34.462 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 461 0 files to be committed to the Delta table for appId=2ee0131e-257c-4466-8523-3b3df0333e88 checkpointId=12.
2024-05-09 15:46:34.463 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 446 Attempting to commit transaction (appId='2ee0131e-257c-4466-8523-3b3df0333e88', checkpointId='12')
2024-05-09 15:46:34.464 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=28bbe224,txnId=21cbc92d] Attempting to commit version 11 with 3 actions with Serializable isolation level
2024-05-09 15:46:34.660 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Loading version 11 starting from checkpoint version 10.
2024-05-09 15:46:34.660 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 [tableId=28bbe224-a5af-413d-ab4c-edee66f9d2b7] Created snapshot io.delta.standalone.internal.SnapshotImpl@24561802
2024-05-09 15:46:34.754 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 190 RecordReader initialized will read a total of 14 records.
2024-05-09 15:46:34.754 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 124 at row 0. reading next block
2024-05-09 15:46:34.770 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 133 block read in memory in 16 ms. row count = 14
2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the protocol from the protocolMetadataHint: Protocol(1,2)
2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the metadata from the protocolMetadataHint: Metadata(28bbe224-a5af-413d-ab4c-edee66f9d2b7,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"f0","type":"string","nullable":true,"metadata":{}},{"name":"f1","type":"integer","nullable":true,"metadata":{}},{"name":"f2","type":"boolean","nullable":true,"metadata":{}}]},List(),Map(),Some(1715269582326))
2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Updated snapshot to io.delta.standalone.internal.SnapshotImpl@24561802
2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=28bbe224,txnId=21cbc92d] Committed delta #11 to abfs://appmode@xx.dfs.core.windows.net/delta-session3/_delta_log
2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 450 Successfully committed transaction (appId='2ee0131e-257c-4466-8523-3b3df0333e88', checkpointId='12')
2024-05-09 15:46:34.777 [] Sink: Global Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 1084 Sink: Global Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0) switched from RUNNING to FINISHED.
2024-05-09 15:46:34.777 [] Sink: Global Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 829 Freeing task resources for Sink: Global Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0).
2024-05-09 15:46:34.778 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1970 Un-registering task and sending final execution state FINISHED to JobManager for task Sink: Global Committer (1/1)#0 1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0.
2024-05-09 15:46:35.896 [] flink-akka.actor.default-dispatcher-15 INFO flink flink.runtime.taskexecutor.slot.TaskSlotTableImpl 439 Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=1.793gb (1925342788 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.534gb (1647648462 bytes), networkMemory=392.830mb (411912115 bytes)}, allocationId: 837710e86c63b1ede1d4018b8e363911, jobId: d7c6078f8d1229e4dee6a953c570bdf5).
2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.DefaultJobLeaderService 170 Remove job d7c6078f8d1229e4dee6a953c570bdf5 from job leader monitoring.
2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService 106 Stopping DefaultLeaderRetrievalService.
2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 91 Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-6b6e31dcb2094950ab531094e84fd1b5-cluster-config-map'}.
2024-05-09 15:46:35.898 [] KubernetesClient-Informer-thread-1 INFO flink flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer 178 Stopped to watch for 6b6e31dcb2094950ab531094e84fd1b5/flink-6b6e31dcb2094950ab531094e84fd1b5-cluster-config-map, watching id:8d643051-f6a5-4e2c-a913-024c655e5487
2024-05-09 15:46:35.899 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1746 Close JobManager connection for job d7c6078f8d1229e4dee6a953c570bdf5.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants