Skip to content

Questions about how to use Kafka Streams with Confluent.Kafka Headers #283

Closed Answered by LGouellec
ncupertino asked this question in Q&A
Discussion options

You must be logged in to vote

Hi @ncupertino ,

You can add or remove headers with StreamizMetadata.GetCurrentHeadersMetadata() inside a lambda function in Map(..), MapValues(...) or etc ...

            var builder = new StreamBuilder();

            builder.Stream<string, string>("topic-in", new StringSerDes(), new StringSerDes())
                .Peek((k, v) => Console.WriteLine($" K: {k} V: {v}"))
                .Map((key, value) => {

                    StreamizMetadata
                        .GetCurrentHeadersMetadata()
                        .Add("headerKey", new byte[] { 0x01, 0x02, 0x03 });

                    return KeyValuePair.Create<string, string>(null, value);
                })
                .To("t…

Replies: 1 comment 4 replies

Comment options

You must be logged in to vote
4 replies
@ncupertino
Comment options

@LGouellec
Comment options

@ncupertino
Comment options

@LGouellec
Comment options

Answer selected by ncupertino
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants