Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schedule feature and tests #655

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions colly.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ type Collector struct {
// to support clean cancellation of scraping.
Context context.Context

// A list of scheduled scraping routines.
//
// Cron expressions are parsed via package
// https://github.com/adhocore/gronx#cron-expression
schedules []schedule

store storage.Storage
debugger debug.Debugger
robotsMap map[string]*robotstxt.RobotsData
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

require (
github.com/PuerkitoBio/goquery v1.5.1
github.com/adhocore/gronx v0.2.6
github.com/andybalholm/cascadia v1.2.0 // indirect
github.com/antchfx/htmlquery v1.2.3
github.com/antchfx/xmlquery v1.3.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/PuerkitoBio/goquery v1.5.1 h1:PSPBGne8NIUWw+/7vFBV+kG2J/5MOjbzc7154OaKCSE=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/adhocore/gronx v0.2.5 h1:trVgNYPle0IcsGDt0XGz78zN8Soek0YrcQtQ+Cs+jko=
github.com/adhocore/gronx v0.2.5/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/adhocore/gronx v0.2.6 h1:bFcYDpnDOOJ+ExO9fhLjNm5sB5+U8JCOaa0SgFYWkFw=
github.com/adhocore/gronx v0.2.6/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/andybalholm/cascadia v1.2.0 h1:vuRCkM5Ozh/BfmsaTm26kbjm0mIOM3yS5Ek/F5h18aE=
github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY=
Expand Down
77 changes: 77 additions & 0 deletions schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package colly

import (
"context"

"github.com/adhocore/gronx/pkg/tasker"
)

type schedule struct {
cron string
url string
err chan error
}

var (
scheduleCtx context.Context
scheduleCancel context.CancelFunc
)

// Schedule adds a new item to the schedule list
// eg. Visit example.com/home every 5 hours
// c.Schedule("* */5 * * *", "https://example.com/home")
//
// Cron expressions are parsed via package
// https://github.com/adhocore/gronx#cron-expression
func (c *Collector) Schedule(expr, u string) chan (error) {
ch := make(chan error)
c.schedules = append(c.schedules, schedule{cron: expr, url: u, err: ch})
return ch
}

// StartSchedules will begin each listed schedules.
// Collector.Context can be used to cancel the entire
// list of schedules.
func (c *Collector) StartSchedules() {
if len(c.schedules) == 0 {
return
}

var ctx context.Context
if c.Context != nil {
ctx = c.Context
} else {
ctx = context.Background()
}

scheduleCtx, scheduleCancel = context.WithCancel(ctx)
taskr := tasker.New(tasker.Option{}).WithContext(scheduleCtx)
go func() {
for _, s := range c.schedules {
taskr.Task(s.cron, func(ctx context.Context) (int, error) {
s.err <- c.Visit(s.url)
return 0, nil
})
}
taskr.Run()
}()
}

// StartSchedulesWait will begin each listed schedules
// and will block until context cancelled
//
// Does not block for all currently running schedules to finish
func (c *Collector) StartSchedulesWait() {
c.StartSchedules()
<-scheduleCtx.Done()
}

// StopSchedules will cancel the underlying context and returns
// blocking operations like StartSchedulesWait
//
// An alternative would be cancelling Collector.Context
func (c *Collector) StopSchedules() {
if scheduleCancel != nil {
scheduleCancel()
}
}
67 changes: 67 additions & 0 deletions schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package colly

import (
"testing"
"time"
)

func TestStartSchedules(t *testing.T) {
ts := newTestServer()
defer ts.Close()

c := NewCollector()
visited := make(chan bool, 1)
c.OnRequest(func(r *Request) {
visited <- true
})

// start schedule on-the-minute
c.Schedule("@always", ts.URL)
c.StartSchedules()

sec := 65 - time.Now().Second()
tick := time.NewTimer(time.Duration(sec) * time.Second)
L:
for {
select {
case <-visited:
break L
case <-tick.C:
t.Errorf("Schedule failed to start after %d sec", sec)
default:
time.Sleep(200 * time.Millisecond)
}
}
}

func TestStartSchedulesWaitAndStop(t *testing.T) {
ts := newTestServer()
defer ts.Close()

c := NewCollector()
visited := make(chan bool, 1)
c.OnRequest(func(r *Request) {
visited <- true
})

go func() {
sec := 65 - time.Now().Second()
tick := time.NewTimer(time.Duration(sec) * time.Second)
L:
for {
select {
case <-visited:
c.StopSchedules()
break L
case <-tick.C:
t.Errorf("Schedule failed to start/stop after %d sec", sec)
default:
time.Sleep(200 * time.Millisecond)
}
}
}()

// start schedule on-the-minute
c.Schedule("@always", ts.URL)
c.StartSchedulesWait()
}