Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add demo template for transactional client #15913

Draft
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

k-raina
Copy link

@k-raina k-raina commented May 9, 2024

This is example code template for Transactional Client. This code assumes that new Exception types have already been implemented.

Copy link
Contributor

@artemlivshits artemlivshits left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling LGTM

ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));

// Process records to generate word count map
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to the point of the example, but why we only look for partition 0?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit: 920437b

// Commit transaction
producer.commitTransaction();
} catch (AbortableTransactionException e) {
// Abortable Exception: Handle Kafka exception by aborting transaction. Abortable Exception should never be thrown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Abortable Exception should never be thrown." -- is the desired message that the abortTransaction never throws abortable exceptions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit: 920437b

} catch (InvalidConfiguationTransactionException e) {
// Fatal Error: The error is bubbled up to the application layer. The application can decide what to do
closeAll();
throw InvalidConfiguationTransactionException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be throw e;?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit: 920437b


boolean isRunning = true;
// Continuously poll for records
while(isRunning) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while (isRunning)

// Commit transaction
producer.commitTransaction();
} catch (AbortableTransactionException e) {
// Abortable Exception: Handle Kafka exception by aborting transaction. AbortTransaction path never throws abortable exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a implementation discussion, but are we saying that producer.abortTransaction() should never throw such an exception? Or that we don't ever try to catch such an exception from abortTransaction?


import static java.time.Duration.ofSeconds;
import static java.util.Collections.singleton;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whenever we want to merge this, we can't use wildcard imports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants