Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3759 Additonal Trident Kafka Spout Metrics #3385

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jmpoholarz
Copy link

What is the purpose of the change

The Trident Kafka Spout/Emitter does not measure a few metrics that my team has found useful over the years using the spout. This PR adds these couple metrics back to the open source.

  1. Event emit rate
    Having a count of how many events are being emitted by the spout can track the rate at which the spout is consuming from the kafka topic(s). If this rate decreases, it signals performance issues to investigate.
    (@kishorvpatil suggested using a meter here. The initial thought was to just use a Counter or a Gauge which resets to 0 every time getValue() is called)

  2. Kafka Spout Max Lag
    This gauge value monitors the backlog of events to determine if the spout is not emitting fast enough to keep up with the incoming data or if the spout is current and Kafka topic delayed. This metric can be used to distinguish between these two cases and determine the source of the slowdown.

How was the change tested

The unit tests for the emitter verify the counts match the expected values.
Additionally, my team compiled the storm-kafka-client package and included it as a dependency in our project. After launching our topologies, we verified that the metric values on our YAMAS charts matched the equivalent metrics we had hardcoded into our custom Kafka emitter.

Copy link
Contributor

@Ethanlm Ethanlm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please file a JIRA and prepend JIRA ID to the PR title, and commit message, for better tracking purposes?

see more at https://github.com/apache/storm/blob/master/DEVELOPER.md

@@ -60,6 +65,14 @@
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

// Metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all there variables be private? If some is used in unit test, use the modifiers with the smallest scope as possible and add @VisibleForTesting annotation to indicate that.

public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
protected transient Gauge<Double> kafkaClientMaxLag;
public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove "Rate" from the metric name and variable name.

A meter has metrics like xx.m1_rate, xx.m15_rate, .count. Rate in the metric name is redundant and somewhat confusing.

// Extract spout lag from consumer's internal metrics
for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) {
Metric metric = metricKeyVal.getValue();
if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, there are actually two types of "records-lag-max" metrics,

one is partition level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L133-L134

and the other one is consumer level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L98-L99

They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from consumer.metrics() is.

I noticed that older kafka version has different way of doing things .
https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L108

We will to take care of compatibility issues here.

public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
protected transient Gauge<Double> kafkaClientMaxLag;
public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
protected transient Meter eventEmitRate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This builtin might be sufficient. Our metric names seem to be reporting values like __emit-count-s1 or __emit-count-s2 (there are almost 200 of these on our Yamas metric search, I'm guessing one per stream). Perhaps something is mis-configured in our topologies preventing it from extracting useful stream names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the recent change on metrics in storm (converting to v2 metrics), stream name will be appended to the metric name.

The tuple counting metric names contain "${stream_name}" or "${upstream_component}:${stream_name}". The former is used for all spout metrics and for outgoing bolt metrics (__emit-count and __transfer-count). The latter is used for bolt metrics that deal with incoming tuples.

DimensionalReporter can be used to separate dimensions (stream_name, componentId, etc) from metrics.

@jmpoholarz jmpoholarz changed the title Add Metrics for Trident Kafka Spout/Emitter STORM-3759 Additonal Trident Kafka Spout Metrics Mar 22, 2021
@rzo1
Copy link
Contributor

rzo1 commented Dec 4, 2023

@avermeer Would that be interesting for your topologies?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants