-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix head stats and hooks when replaying a corrupted snapshot #14079
Fix head stats and hooks when replaying a corrupted snapshot #14079
Conversation
f8d34ad
to
6ee4190
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks make sense to me.
In Cortex, we rely on series lifecycle callback to keep track of active series. We hit this series double counting bug when getting a corrupted chunk and active series reached limit because post delete hook was not called.
cb8c835
to
61bf6b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this; looks fairly good but some comments from the bug-scrub meeting.
tsdb/head.go
Outdated
totalSeries += len(deletedForCallback) | ||
deletedFromPrevStripe = len(deletedForCallback) | ||
} | ||
s.series = make([]map[chunks.HeadSeriesRef]*memSeries, s.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a no-op because it is overwritten on line 319?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. it is.. i just wanted to do that for correctness of the "reset" method - reset seems that we are resetting the struct to the initial state (empty) and could be reused. WDYT?
I can no do that and rename the method to something else (maybe flush? clean?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code:
- Renamed the method from reset to flush
- Removed the extra logic to clean the series
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can still see s.series = make([]map[chunks.HeadSeriesRef]*memSeries, s.size)
, is it still needed?
I added some comments below regarding iter
, below.
As its invocation would become simpler, we can just move it directly to resetInMemoryState()
with a comment.
(And entrust the task of naming to the person who will extract this code in the future :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion: get rid of flush()
and inline it in resetInMemoryState()
without this allocation. Or remove this line of s.series = ...
and probably rename this function to callPostDeletionForAll()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.. make sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.. i just did that! its better indeed!
tsdb/head_test.go
Outdated
@@ -4007,24 +4008,44 @@ func TestSnapshotError(t *testing.T) { | |||
require.NoError(t, err) | |||
f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) | |||
require.NoError(t, err) | |||
_, err = f.WriteAt([]byte{0b11111111}, 18) | |||
// lets corrupt middle of the snapshot, so we can replay some entries | |||
_, err = f.WriteAt([]byte{0b11111111}, 300) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the change from 18 to 300 significant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. it is..
If we corrupt the byte 18, we will not be able to restore any series (and so we cannot see the problem).
Corrupting the byte 300, we are able to restore 2 timeseries before reaching the corrupted position, and so, highlight the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don’t make it appear as if we’ve deleted a test (we'd want to continue running the other checks when no series has been restored) Let’s add that as another scenario. You can simply recreate a head at the end and run the new checks related to callbacks, or even better, run all checks for the two cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to create a new snapshot as the old one is already corrupted in the beginning. Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I backed up the snapshot and restored to create the other test case! PTAL?
0b2ec4b
to
70a26fd
Compare
@bboreham PTAL? |
@codesome is this something you could help review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
tsdb/head.go
Outdated
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile | ||
} | ||
|
||
func (s *stripeSeries) iter(f func(int, uint64, *memSeries, map[chunks.HeadSeriesRef]labels.Labels), endShard func(map[chunks.HeadSeriesRef]labels.Labels)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make it more specific (by renaming it deleteFunc
or iterForDeletion
or sth else + some comments), call PostDeletion inside it and make it return the number of deleted series.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also note in the comments what f
should do with the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not have "deletion" in the name, because this function in itself has nothing to do with deletion. The users of this just happen to use it for deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we dont have deletion on the name we cannot call the post deletion hook inside the function! =/ What u guys think is better here? TBH i like the iterForDeletion
as it makes the code cleaner!
tsdb/head.go
Outdated
totalSeries += len(deletedForCallback) | ||
deletedFromPrevStripe = len(deletedForCallback) | ||
} | ||
s.series = make([]map[chunks.HeadSeriesRef]*memSeries, s.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can still see s.series = make([]map[chunks.HeadSeriesRef]*memSeries, s.size)
, is it still needed?
I added some comments below regarding iter
, below.
As its invocation would become simpler, we can just move it directly to resetInMemoryState()
with a comment.
(And entrust the task of naming to the person who will extract this code in the future :))
tsdb/head_test.go
Outdated
@@ -4007,24 +4008,44 @@ func TestSnapshotError(t *testing.T) { | |||
require.NoError(t, err) | |||
f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) | |||
require.NoError(t, err) | |||
_, err = f.WriteAt([]byte{0b11111111}, 18) | |||
// lets corrupt middle of the snapshot, so we can replay some entries | |||
_, err = f.WriteAt([]byte{0b11111111}, 300) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don’t make it appear as if we’ve deleted a test (we'd want to continue running the other checks when no series has been restored) Let’s add that as another scenario. You can simply recreate a head at the end and run the new checks related to callbacks, or even better, run all checks for the two cases.
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Co-authored-by: Ayoub Mrini <ayoubmrini424@gmail.com> Signed-off-by: Alan Protasio <alanprot@gmail.com>
a14e433
to
37aae47
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking the unit tests after this
tsdb/head.go
Outdated
totalSeries += len(deletedForCallback) | ||
deletedFromPrevStripe = len(deletedForCallback) | ||
} | ||
s.series = make([]map[chunks.HeadSeriesRef]*memSeries, s.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion: get rid of flush()
and inline it in resetInMemoryState()
without this allocation. Or remove this line of s.series = ...
and probably rename this function to callPostDeletionForAll()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment on tests and the above comments, looks good otherwise. Thanks!
cbc345d
to
c19fb3c
Compare
c19fb3c
to
ad98c84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Small nits.
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Alan Protasio <alanprot@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Alan Protasio <alanprot@gmail.com>
// and increment the series removed metrics | ||
fs := h.series.iterForDeletion(func(_ int, _ uint64, s *memSeries, flushedForCallback map[chunks.HeadSeriesRef]labels.Labels) { | ||
// All series should be flushed | ||
flushedForCallback[s.ref] = s.lset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to submit this comment that was more of a question:
I don't know if we should lock s
here, I don't know if we could have any races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iterForDeletion
is already locking here, so should be good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was talking about this lock
Line 1883 in e6f1f7e
series.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum..
Do you think is needed? I can add those locks but right now i dont see a case where they are racing on the reset method.
On the check function we are reading and modifying the series at the same time we are possibly appending more samples, but maybe its not the case on the reset? I can still add just to be safe though.
Thanks @alanprot, this lgtm ;) |
When loading a snapshot and encountering a corrupted chunk, we discard previously loaded series from the snapshot and resort to replaying the wall. In such cases, we were not resetting the number of series in the head, leading to double counting them.
Additionally, we did not invoke the PostDeletion hook when resetting the memory - this needs to be called as the PostCreation was called for the series which we were able to replay from the snapshot but were subsequently discarded.