New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add demo template for transactional client #15913
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling LGTM
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60)); | ||
|
||
// Process records to generate word count map | ||
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to the point of the example, but why we only look for partition 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in commit: 920437b
// Commit transaction | ||
producer.commitTransaction(); | ||
} catch (AbortableTransactionException e) { | ||
// Abortable Exception: Handle Kafka exception by aborting transaction. Abortable Exception should never be thrown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Abortable Exception should never be thrown." -- is the desired message that the abortTransaction never throws abortable exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in commit: 920437b
} catch (InvalidConfiguationTransactionException e) { | ||
// Fatal Error: The error is bubbled up to the application layer. The application can decide what to do | ||
closeAll(); | ||
throw InvalidConfiguationTransactionException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be throw e;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in commit: 920437b
|
||
boolean isRunning = true; | ||
// Continuously poll for records | ||
while(isRunning) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: while (isRunning)
// Commit transaction | ||
producer.commitTransaction(); | ||
} catch (AbortableTransactionException e) { | ||
// Abortable Exception: Handle Kafka exception by aborting transaction. AbortTransaction path never throws abortable exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a implementation discussion, but are we saying that producer.abortTransaction() should never throw such an exception? Or that we don't ever try to catch such an exception from abortTransaction?
|
||
import static java.time.Duration.ofSeconds; | ||
import static java.util.Collections.singleton; | ||
import static org.apache.kafka.clients.consumer.ConsumerConfig.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: whenever we want to merge this, we can't use wildcard imports.
This is example code template for Transactional Client. This code assumes that new Exception types have already been implemented.