Skip to content

Commit

Permalink
chore(core): Add directory-watching to 'watcher2'
Browse files Browse the repository at this point in the history
Description
---
Adds `WatchDir` function to the `watcher2` package, making `watcher` redundant. I will delete `watcher` after removing references in `tb.go`.

Reviewers: kptkin

Reviewed By: kptkin

Pull Request: #7596
  • Loading branch information
timoffex committed May 10, 2024
1 parent 539e17d commit 4208d23
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 27 deletions.
36 changes: 24 additions & 12 deletions core/internal/watcher2/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watcher2
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"

Expand All @@ -16,7 +17,7 @@ type watcher struct {
logger *observability.CoreLogger
delegate *poller.Watcher
wg *sync.WaitGroup
handlers map[string]func()
handlers map[string]func(string)
isFinished bool

pollingPeriod time.Duration
Expand All @@ -30,13 +31,21 @@ func newWatcher(params Params) *watcher {
return &watcher{
logger: params.Logger,
wg: &sync.WaitGroup{},
handlers: make(map[string]func()),
handlers: make(map[string]func(string)),

pollingPeriod: params.PollingPeriod,
}
}

func (w *watcher) Watch(path string, onChange func()) error {
return w.watchFileOrDir(path, func(string) { onChange() })
}

func (w *watcher) WatchDir(path string, onChange func(string)) error {
return w.watchFileOrDir(path, onChange)
}

func (w *watcher) watchFileOrDir(path string, onChange func(string)) error {
w.Lock()
defer w.Unlock()

Expand Down Expand Up @@ -80,10 +89,12 @@ func (w *watcher) startWatcher() error {
}

w.delegate = poller.New()
// Note: we only include Create because there's a bug in this dependency
// where it can sometimes emit a Create event for a file that already
// exists. This is because of a race condition between Add() and the
// polling loop in Start().
// NOTE: The "radovskyb/watcher" dependency has a bug where it sometimes
// emits 'Create' events for files that already exist because of a race
// condition between Add() and the polling loop in Start().
//
// In other words, we cannot distinguish between Write and Create events,
// which is why that's not part of this package's public interface.
w.delegate.FilterOps(poller.Write, poller.Create)

grp, ctx := errgroup.WithContext(context.Background())
Expand Down Expand Up @@ -164,14 +175,15 @@ func (w *watcher) loopWatchFiles(ctx context.Context) {
func (w *watcher) onChange(evt poller.Event) {
w.Lock()
handler := w.handlers[evt.Path]
parentHandler := w.handlers[filepath.Dir(evt.Path)]
w.Unlock()

if handler == nil {
// This shouldn't happen since we don't remove handlers,
// but we should fail gracefully just in case.
return
if handler != nil {
handler(evt.Path)
} else if parentHandler != nil {
parentHandler(evt.Path)
}

// Never hold a mutex while invoking code you don't control!
handler()
// This shouldn't happen since we don't remove handlers,
// but we should fail gracefully just in case.
}
9 changes: 9 additions & 0 deletions core/internal/watcher2/watcher2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ type Watcher interface {
// The file must exist, or an error is returned.
Watch(path string, onChange func()) error

// WatchDir begins watching the contents of the directory at the path.
//
// `onChange` is invoked with a file path if a direct child of the
// directory is changed or created. The directory is not watched
// recursively.
//
// The directory must exist, or an error is returned.
WatchDir(path string, onChange func(string)) error

// Finish stops the watcher from emitting any more change events.
Finish()
}
Expand Down
49 changes: 37 additions & 12 deletions core/internal/watcher2/watcher2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wandb/wandb/core/internal/watcher2"
)

func writeFileAndGetModTime(t *testing.T, path string, content string) time.Time {
func mkdir(t *testing.T, path string) {
require.NoError(t,
os.MkdirAll(
filepath.Dir(path),
path,
syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR,
))
}

func writeFileAndGetModTime(t *testing.T, path string, content string) time.Time {
mkdir(t, filepath.Dir(path))

require.NoError(t,
os.WriteFile(path, []byte(content), syscall.S_IRUSR|syscall.S_IWUSR))
Expand All @@ -31,6 +36,16 @@ func writeFile(t *testing.T, path string, content string) {
_ = writeFileAndGetModTime(t, path, content)
}

func waitWithDeadline[S any](t *testing.T, c <-chan S, msg string) S {
select {
case x := <-c:
return x
case <-time.After(5 * time.Second):
t.Fatal("took too long: " + msg)
panic("unreachable")
}
}

func TestWatcher(t *testing.T) {
// The watcher implementation we rely on uses `time.Sleep()`, making for
// flaky and slow tests. The tests in this function are carefully designed
Expand All @@ -54,13 +69,6 @@ func TestWatcher(t *testing.T) {
PollingPeriod: 10 * time.Millisecond,
})
}
waitWithDeadline := func(t *testing.T, c <-chan struct{}, msg string) {
select {
case <-c:
case <-time.After(5 * time.Second):
t.Fatal("took too long: " + msg)
}
}
finishWithDeadline := func(t *testing.T, w watcher2.Watcher) {
finished := make(chan struct{})

Expand All @@ -82,9 +90,7 @@ func TestWatcher(t *testing.T) {
watcher := newTestWatcher()
defer finishWithDeadline(t, watcher)
require.NoError(t,
watcher.Watch(file, func() {
onChangeChan <- struct{}{}
}))
watcher.Watch(file, func() { onChangeChan <- struct{}{} }))
time.Sleep(100 * time.Millisecond) // see below
t2 := writeFileAndGetModTime(t, file, "xyz")

Expand All @@ -104,6 +110,25 @@ func TestWatcher(t *testing.T) {
"expected file callback to be called")
})

t.Run("runs callback on new file in directory", func(t *testing.T) {
t.Parallel()

onChangeChan := make(chan string)
dir := filepath.Join(t.TempDir(), "dir")
file := filepath.Join(dir, "file.txt")
mkdir(t, dir)

watcher := newTestWatcher()
defer finishWithDeadline(t, watcher)
require.NoError(t,
watcher.WatchDir(dir, func(s string) { onChangeChan <- s }))
writeFile(t, file, "")

result := waitWithDeadline(t, onChangeChan,
"expected file callback to be called")
assert.Equal(t, result, file)
})

t.Run("fails if file does not exist", func(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 14 additions & 3 deletions core/internal/watcher2test/watcher2test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ import (
type FakeWatcher struct {
sync.Mutex

handlers map[string]func()
handlers map[string]func(string)
}

var _ watcher2.Watcher = &FakeWatcher{}

func NewFakeWatcher() *FakeWatcher {
return &FakeWatcher{
handlers: make(map[string]func()),
handlers: make(map[string]func(string)),
}
}

// OnChange invokes the change callback registered for the path, if any.
func (w *FakeWatcher) OnChange(path string) {
w.Lock()
handler := w.handlers[path]
parentHandler := w.handlers[filepath.Dir(path)]
w.Unlock()

if handler != nil {
handler()
handler(path)
} else if parentHandler != nil {
parentHandler(path)
}
}

Expand All @@ -44,6 +47,14 @@ func (w *FakeWatcher) IsWatching(path string) bool {
}

func (w *FakeWatcher) Watch(path string, callback func()) error {
return w.watchFileOrDir(path, func(string) { callback() })
}

func (w *FakeWatcher) WatchDir(path string, callback func(string)) error {
return w.watchFileOrDir(path, callback)
}

func (w *FakeWatcher) watchFileOrDir(path string, callback func(string)) error {
w.Lock()
defer w.Unlock()

Expand Down

0 comments on commit 4208d23

Please sign in to comment.