Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue Schema Registry: Throttling exception when trying to register schema version for multiple tables #11045

Open
phuongvu opened this issue May 7, 2024 · 10 comments
Assignees
Labels
affects-6.5 affects-7.1 affects-7.5 affects-8.1 area/ticdc Issues or PRs related to TiCDC. severity/moderate This is a moderate bug. type/bug This is a bug.

Comments

@phuongvu
Copy link

phuongvu commented May 7, 2024

What did you do?

We created:

  1. A TiCDC changefeed listening to non-ddl changes from tables (whose name: test_*) and sink to a single kafka topic using Avro protocol.
  2. Multiple tables with the same schema, e.g., test_1, test_2,... test_n

Whenever the tables are created, changefeed task would try to register the schema version at the same time for all the tables and that causes rate limited errors.

What did you expect to see?

No concurrently update the schema version for the same schema
https://github.com/pingcap/tiflow/blob/master/pkg/sink/codec/avro/avro.go#L154

What did you see instead?

operation error Glue: RegisterSchemaVersion, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: c85a2ee3-b98b-459e-a621-d58a89b045bd, api error ThrottlingException: Rate exceeded

operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 45b24c85-eb3c-4936-bc37-4e46f18efd61, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1:131234521375:schema/staging_cdc/staging_cdc_v0-key

Pause and resume changefeed sometimes fixes this but I was hoping we can handle this within ticdc.

Versions of the cluster

Upstream TiDB cluster version (execute SELECT tidb_version(); in a MySQL client):

Release Version: v7.5.1¶Edition: Community¶Git Commit Hash: 7d16cc79e81bbf573124df3fd9351c26963f3e70¶Git Branch: heads/refs/tags/v7.5.1¶UTC Build 

Upstream TiKV version (execute tikv-server --version):

TiKV
Release Version:   8.0.0
Edition:           Community
Git Commit Hash:   9f51dfed1f04224a746facfbd919ae8ebec639eb
Git Commit Branch: HEAD
UTC Build Time:    2024-03-28 16:29:18
Rust Version:      rustc 1.77.0-nightly (89e2160c4 2023-12-27)
Enable Features:   pprof-fp jemalloc mem-profiling portable test-engine-kv-rocksdb test-engine-raft-raft-engine trace-async-tasks openssl-vendored
Profile:           dist_release

TiCDC version (execute cdc version):

Release Version: v8.0.0
Git Commit Hash: b81378bc6610bbf39af8c09a2df8f89faa0d537c
Git Branch: HEAD
UTC Build Time: 2024-03-28 16:11:33
Go Version: go version go1.21.4 linux/arm64
Failpoint Build: false
@phuongvu phuongvu added area/ticdc Issues or PRs related to TiCDC. type/bug This is a bug. labels May 7, 2024
@github-actions github-actions bot added this to Need Triage in Question and Bug Reports May 7, 2024
@phuongvu phuongvu changed the title Glue: RegisterSchemaVersion ThrottlingException Glue Schema Registry: Throttling exception when trying to register schema version for multiple tables May 7, 2024
@phuongvu
Copy link
Author

phuongvu commented May 8, 2024

Hey @asddongmen, since you implemented the Glue integration, would you mind sharing some insights on this issue? Maybe there is a workaround? Thank you!

@phuongvu
Copy link
Author

phuongvu commented May 8, 2024

Some logs
May 08 19:18:34 ip-.ec2.internal cdc[541201]: [2024/05/08 19:18:34.426 +00:00] [INFO] [glue_schema_registry.go:108] ["Schema already exists in registry, update it"] [schemaName=stats_events_db_conv_staging_cdc_v0-key] May 08 19:18:34 ip-.ec2.internal cdc[541201]: [2024/05/08 19:18:34.464 +00:00] [ERROR] [glue_schema_registry.go:219] ["GetCachedOrRegister: Could not register schema"] [error="operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 78473ce4-3981-4230-89ac-bcd16f57f21c, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1::schema/staging_cdc/staging_cdc_v0-key"] [errorVerbose="operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 78473ce4-3981-4230-89ac-bcd16f57f21c, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1::schema/staging_cdc/staging_cdc_v0-key\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174\ngithub.com/pingcap/errors.Trace\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/juju_adaptor.go:15\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).updateSchema\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:289\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).Register\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:109\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).GetCachedOrRegister\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:217\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).getKeySchemaCodec\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:154\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).encodeKey\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:89\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).AppendRowChangedEvent\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:215\ngithub.com/pingcap/tiflow/pkg/sink/codec.(*encoderGroup).runEncoder\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/encoder_group.go:143\ngithub.com/pingcap/tiflow/pkg/sink/codec.(*encoderGroup).Run.func2\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/encoder_group.go:115\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\tgolang.org/x/sync@v0.6.0/errgroup/errgroup.go:78\nruntime.goexit\n\truntime/asm_arm64.s:1197"]

@asddongmen
Copy link
Contributor

asddongmen commented May 9, 2024

It appears that this issue is due to all the tables sinking to a single Kafka topic, causing them to share a schemaName. However, these tables have different tableVersion values. This inconsistency often leads to the failure of the check at

if entry, exists := m.cache[schemaName]; exists && entry.tableVersion == tableVersion {
, since the tableVersion values differ. So a new schema will be update to schemaRegistry.

A possible workaround is to sink different tables to different topics.
You can refer to: https://docs.pingcap.com/tidb/dev/ticdc-sink-to-kafka#topic-dispatchers

@phuongvu

@3AceShowHand
Copy link
Contributor

  • Avro protocol, only encode DML events.
  • schema key = topicName + suffix.

If there are multiple tables, and they were dispatched to the same topic, so the schema key is the same.

The encoder group runs 32 encoders concurrently, each one may fetch the schema independently, this may cause the issue.

@fubinzh
Copy link

fubinzh commented May 9, 2024

/severity moderate

@asddongmen
Copy link
Contributor

@phuongvu Could you please provide the changefeed's config to help us investigate further?

@phuongvu
Copy link
Author

phuongvu commented May 9, 2024

It appears that this issue is due to all the tables sinking to a single Kafka topic,

Thank you guys for the insight! It makes sense that my setup causes this issue :( I'll go with the workaround!

Here is my config as requested:

cdc cli changefeed create --changefeed-id="staging-v0" --server=http://localhost:2017 --sink-uri="kafka://localhost:9092/staging_cdc.v0?protocol=avro&partition-num=24&compression=zstd&max-message-bytes=67108864&replication-factor=2&enable-tidb-extension=true&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --config /etc/tikv/db-conv-staging.toml

With the changefeed.toml config:

case-sensitive = false
changefeed-error-stuck-duration = "30m"
[mounter]
[filter]
rules = ['sa_events.events_conv_staging_*']
[[filter.event-filters]]
ignore-event = ["all ddl"] # Ignore all ddl events.

[scheduler]
enable-table-across-nodes = true
region-threshold = 10000
# write-key-threshold = 30000

[sink]
dispatchers = [
  {matcher = ['sa_events.events_conv_staging_*'], partition = "index-value"},
]

protocol = "avro"

[sink.kafka-config.codec-config]

[integrity]
integrity-check-level = "correctness"

@phuongvu
Copy link
Author

phuongvu commented May 9, 2024

It might not be related but I try to sink all the tables to a single Kafka topic using canal-json (since it doesn't require schema registry). But I also got an error doing this as well. So maybe this limitation (i.e. sink all the tables to a single Kafka topic) is also applied to other protocol as well (e.g. canal-json in this case)

[
  {
    "id": "staging-v0",
    "namespace": "default",
    "summary": {
      "state": "warning",
      "tso": 449651699691815041,
      "checkpoint": "2024-05-09 20:05:09.298",
      "error": {
        "time": "2024-05-09T20:18:56.453120599Z",
        "addr": "172.30.67.215:2017",
        "code": "CDC:ErrKafkaSendMessage",
        "message": "[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."
      }
    }
  }
]

Interestingly, the error message Failed to deliver 24 messages. coincides with what I was trying to do: sinking 24 tables to one topic using canal-json.

   1 # https://docs.pingcap.com/tidb/v8.0/ticdc-changefeed-config
   2 case-sensitive = false
   3 changefeed-error-stuck-duration = "30m"
   4
   5 [filter]
   6 rules = ['sa_events.events_conv_staging_*']
   7
   8 [[filter.event-filters]]
   9 ignore-event = ["all ddl","delete"] # Ignore all ddl events.
  10
  11 [scheduler]
  12 enable-table-across-nodes = true
  13 region-threshold = 1000
  14 # write-key-threshold = 30000
  15
  16 [sink]
  17 dispatchers = [
  18    {matcher = ['sa_events.events_conv_staging_*'], partition = "index-value"},
  19 ]
  20
  21 protocol = "canal-json"

Some logs:

- May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [WARN] [options.go:608] ["topic's `max.message.bytes` less than the `max-message-bytes`,use topic's `max.message.bytes` to initialize the Kafka producer"] [max.message.bytes=1048588] [max-message-bytes=67108864] [real-max-message-bytes=1048460]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [WARN] [options.go:623] ["topic already exist, TiCDC will not create the topic"] [topic=stats.staging_cdc.v0] [detail="{"Name":"stats.staging_cdc.v0","NumPartitions":24,"ReplicationFactor":0}"]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [kafka_manager.go:136] ["store topic partition number"] [namespace=default] [changefeed=conv-staging-v0] [topic=stats.staging_cdc.v0] [partitionNumber=24]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [tz.go:43] ["Load the timezone specified by the user"] [timezoneName=UTC] [timezone=UTC]
- May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [kafka_ddl_sink.go:107] ["Try to create a DDL sink producer"] [changefeed=default/conv-staging-v0]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [sarama.go:96] ["Kafka producer uses zstd compression algorithm"]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.963 +00:00] [INFO] [kafka_ddl_sink.go:116] ["DDL sink producer client created"] [duration=1.73655ms]
  May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.969 +00:00] [WARN] [ddl_sink.go:170] ["owner ddl sink fails on action"] [namespace=default] [changefeed=conv-staging-v0] [action=writeCheckpointTs] [retryable=true] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages.ngithub.com/pingcap/errors.AddStackntgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:34\ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer.(*kafkaDDLProducer).SyncBroadcastMessagentgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go:74ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq.(*DDLSink).WriteCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/mq_ddl_sink.go:180\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).writeCheckpointTs.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:227ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).retrySinkAction\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:166\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).observedRetrySinkAction.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:198nruntime.goexitntruntime/asm_arm64.s:1197"]
  May 09 19:21:23 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:23.003 +00:00] [WARN] [changefeed.go:325] ["an warning occurred in Owner"] [namespace=default] [changefeed=conv-staging-v0] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages.ngithub.com/pingcap/errors.AddStackntgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:34\ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer.(*kafkaDDLProducer).SyncBroadcastMessagentgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go:74ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq.(*DDLSink).WriteCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/mq_ddl_sink.go:180\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).writeCheckpointTs.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:227ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).retrySinkAction\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:166\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).observedRetrySinkAction.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:198nruntime.goexitntruntime/asm_arm64.s:1197"]
  May 09 19:21:23 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:23.583 +00:00] [INFO] [kafka_manager.go:181] ["Kafka admin client describe topics success"] [namespace=default] [changefeed=conv-staging-v0] [duration=3.774µs]

@asddongmen
Copy link
Contributor

@phuongvu

May I ask, did the workaround for Avro protocol work?

Regarding your question about the cana-json protocol, it shouldn't be impacted by this issue. I have not been able to reproduce it in my local environment.

If possible, providing the complete ticdc log could assist us in further investigating this matter.

@phuongvu
Copy link
Author

Hey @asddongmen, we sort of put the project that gonna use TiCDC on hold right now so I haven't had the chance to try to try the workaround yet but I think it should work. Re: log for canal-json, I can try the find the log and post it here.

@asddongmen asddongmen self-assigned this May 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affects-6.5 affects-7.1 affects-7.5 affects-8.1 area/ticdc Issues or PRs related to TiCDC. severity/moderate This is a moderate bug. type/bug This is a bug.
Projects
Development

No branches or pull requests

4 participants