Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): Add directory-watching to 'watcher2' #7596

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 24 additions & 12 deletions core/internal/watcher2/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watcher2
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"

Expand All @@ -16,7 +17,7 @@ type watcher struct {
logger *observability.CoreLogger
delegate *poller.Watcher
wg *sync.WaitGroup
handlers map[string]func()
handlers map[string]func(string)
isFinished bool

pollingPeriod time.Duration
Expand All @@ -30,13 +31,21 @@ func newWatcher(params Params) *watcher {
return &watcher{
logger: params.Logger,
wg: &sync.WaitGroup{},
handlers: make(map[string]func()),
handlers: make(map[string]func(string)),

pollingPeriod: params.PollingPeriod,
}
}

func (w *watcher) Watch(path string, onChange func()) error {
return w.watchFileOrDir(path, func(string) { onChange() })
}

func (w *watcher) WatchDir(path string, onChange func(string)) error {
return w.watchFileOrDir(path, onChange)
}

func (w *watcher) watchFileOrDir(path string, onChange func(string)) error {
w.Lock()
defer w.Unlock()

Expand Down Expand Up @@ -80,10 +89,12 @@ func (w *watcher) startWatcher() error {
}

w.delegate = poller.New()
// Note: we only include Create because there's a bug in this dependency
// where it can sometimes emit a Create event for a file that already
// exists. This is because of a race condition between Add() and the
// polling loop in Start().
// NOTE: The "radovskyb/watcher" dependency has a bug where it sometimes
// emits 'Create' events for files that already exist because of a race
// condition between Add() and the polling loop in Start().
//
// In other words, we cannot distinguish between Write and Create events,
// which is why that's not part of this package's public interface.
w.delegate.FilterOps(poller.Write, poller.Create)

grp, ctx := errgroup.WithContext(context.Background())
Expand Down Expand Up @@ -164,14 +175,15 @@ func (w *watcher) loopWatchFiles(ctx context.Context) {
func (w *watcher) onChange(evt poller.Event) {
w.Lock()
handler := w.handlers[evt.Path]
parentHandler := w.handlers[filepath.Dir(evt.Path)]
w.Unlock()

if handler == nil {
// This shouldn't happen since we don't remove handlers,
// but we should fail gracefully just in case.
return
if handler != nil {
handler(evt.Path)
} else if parentHandler != nil {
parentHandler(evt.Path)
}

// Never hold a mutex while invoking code you don't control!
handler()
// This shouldn't happen since we don't remove handlers,
// but we should fail gracefully just in case.
}
9 changes: 9 additions & 0 deletions core/internal/watcher2/watcher2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ type Watcher interface {
// The file must exist, or an error is returned.
Watch(path string, onChange func()) error

// WatchDir begins watching the contents of the directory at the path.
//
// `onChange` is invoked with a file path if a direct child of the
// directory is changed or created. The directory is not watched
// recursively.
//
// The directory must exist, or an error is returned.
WatchDir(path string, onChange func(string)) error

// Finish stops the watcher from emitting any more change events.
Finish()
}
Expand Down
49 changes: 37 additions & 12 deletions core/internal/watcher2/watcher2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wandb/wandb/core/internal/watcher2"
)

func writeFileAndGetModTime(t *testing.T, path string, content string) time.Time {
func mkdir(t *testing.T, path string) {
require.NoError(t,
os.MkdirAll(
filepath.Dir(path),
path,
syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR,
))
}

func writeFileAndGetModTime(t *testing.T, path string, content string) time.Time {
mkdir(t, filepath.Dir(path))

require.NoError(t,
os.WriteFile(path, []byte(content), syscall.S_IRUSR|syscall.S_IWUSR))
Expand All @@ -31,6 +36,16 @@ func writeFile(t *testing.T, path string, content string) {
_ = writeFileAndGetModTime(t, path, content)
}

func waitWithDeadline[S any](t *testing.T, c <-chan S, msg string) S {
select {
case x := <-c:
return x
case <-time.After(5 * time.Second):
t.Fatal("took too long: " + msg)
panic("unreachable")
}
}

func TestWatcher(t *testing.T) {
// The watcher implementation we rely on uses `time.Sleep()`, making for
// flaky and slow tests. The tests in this function are carefully designed
Expand All @@ -54,13 +69,6 @@ func TestWatcher(t *testing.T) {
PollingPeriod: 10 * time.Millisecond,
})
}
waitWithDeadline := func(t *testing.T, c <-chan struct{}, msg string) {
select {
case <-c:
case <-time.After(5 * time.Second):
t.Fatal("took too long: " + msg)
}
}
finishWithDeadline := func(t *testing.T, w watcher2.Watcher) {
finished := make(chan struct{})

Expand All @@ -82,9 +90,7 @@ func TestWatcher(t *testing.T) {
watcher := newTestWatcher()
defer finishWithDeadline(t, watcher)
require.NoError(t,
watcher.Watch(file, func() {
onChangeChan <- struct{}{}
}))
watcher.Watch(file, func() { onChangeChan <- struct{}{} }))
time.Sleep(100 * time.Millisecond) // see below
t2 := writeFileAndGetModTime(t, file, "xyz")

Expand All @@ -104,6 +110,25 @@ func TestWatcher(t *testing.T) {
"expected file callback to be called")
})

t.Run("runs callback on new file in directory", func(t *testing.T) {
t.Parallel()

onChangeChan := make(chan string)
dir := filepath.Join(t.TempDir(), "dir")
file := filepath.Join(dir, "file.txt")
mkdir(t, dir)

watcher := newTestWatcher()
defer finishWithDeadline(t, watcher)
require.NoError(t,
watcher.WatchDir(dir, func(s string) { onChangeChan <- s }))
writeFile(t, file, "")

result := waitWithDeadline(t, onChangeChan,
"expected file callback to be called")
assert.Equal(t, result, file)
})

t.Run("fails if file does not exist", func(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 14 additions & 3 deletions core/internal/watcher2test/watcher2test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ import (
type FakeWatcher struct {
sync.Mutex

handlers map[string]func()
handlers map[string]func(string)
}

var _ watcher2.Watcher = &FakeWatcher{}

func NewFakeWatcher() *FakeWatcher {
return &FakeWatcher{
handlers: make(map[string]func()),
handlers: make(map[string]func(string)),
}
}

// OnChange invokes the change callback registered for the path, if any.
func (w *FakeWatcher) OnChange(path string) {
w.Lock()
handler := w.handlers[path]
parentHandler := w.handlers[filepath.Dir(path)]
w.Unlock()

if handler != nil {
handler()
handler(path)
} else if parentHandler != nil {
parentHandler(path)
}
}

Expand All @@ -44,6 +47,14 @@ func (w *FakeWatcher) IsWatching(path string) bool {
}

func (w *FakeWatcher) Watch(path string, callback func()) error {
return w.watchFileOrDir(path, func(string) { callback() })
}

func (w *FakeWatcher) WatchDir(path string, callback func(string)) error {
return w.watchFileOrDir(path, callback)
}

func (w *FakeWatcher) watchFileOrDir(path string, callback func(string)) error {
w.Lock()
defer w.Unlock()

Expand Down