-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ES-1122621] fix receive connection re-establish taking too long (5m) #36
Conversation
Signed-off-by: Yi Jin <yi.jin@databricks.com>
Signed-off-by: Yi Jin <yi.jin@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome debugging.
pkg/receive/handler.go
Outdated
if err := p.closeUnlocked(addr); err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it remember the error, but continue and try to close all connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, we should do this.
pkg/receive/handler.go
Outdated
return c, nil | ||
} | ||
level.Debug(p.logger).Log("msg", "dialing peer", "addr", addr) | ||
conn, err := p.dialer(ctx, addr, p.dialOpts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I searched in the repo. This is the only place where p.dialer
is called. That means the handler never re-establishs a connection to a DNS. Before your change, the write errors eventually die down. That means the gRPC framework eventually gets the correct IP and re-establishs the connection under the hood, but the delay is quite long and causes many write errors.
I'm concerned that there might be a reason that the gRPC call site (receive handler) doesn't explicitly dial again. Should we ask the community?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the default idle timeout to 5 Minutes is the root cause, i was able to pass the unit tests with 1 second idle timeout.
pkg/receive/handler.go
Outdated
if !ok { | ||
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount) | ||
} else { | ||
p.connections[addr].cc = conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if, in reality (in contrast to the unit test), re-dialing still sees the stale IP because of some weird interaction between gRPC's DNS resolver and Kubernetes networking (CoreDNS)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plan to revert this one.
@@ -1735,3 +1785,91 @@ func TestHandlerFlippingHashrings(t *testing.T) { | |||
cancel() | |||
wg.Wait() | |||
} | |||
|
|||
func TestIngestorRestart(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome unit test!
@@ -158,6 +158,7 @@ func runReceive( | |||
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression))) | |||
} | |||
if receiveMode == receive.RouterOnly { | |||
dialOpts = append(dialOpts, grpc.WithIdleTimeout(time.Duration(*conf.maxBackoff))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a cmd flag to change conf.maxBackoff
? If no, better to add one so that we can tune it later without rebuilding an image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is from cmd flag
pkg/receive/handler.go
Outdated
defer p.m.Unlock() | ||
var err error | ||
for addr := range p.connections { | ||
err = p.closeUnlocked(addr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be
if er := p.closeUnlocked(addr); er != nil {
err = er
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to use multi errors
Signed-off-by: Yi Jin <yi.jin@databricks.com>
We've seen some 503 errors when quorum supposed to meet as the ingestor is in running state, however router still can't talk to it, able to reproduce this behavior locally:
I've introduced a unit test to mock the prod environment:
ip1
, 1 with a DNS and resolve toip2
ip2
, and write requests start to failip3
and bind it to previous DNS which resolved toip2
to simulate ingestor restartshandler.go
, the unit test won't succeed after a whilehandler.go
, the unit test will succeed quicklyChanges
Verification
Before the change:
After the change, reduced rollout operator with 5s delay between sts, still see small 503 but m3 write coordinator retries should help, extend to a bit longer delays like 1m and it should not have any 503: