Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Fix bug: Scheduler paused forever #871

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyspider/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ def paused(self):
for _, task in self.active_tasks:
if task is self._unpause_last_seen:
break
cnt += 1
# ignore select task
if task.get('type') == self.scheduler.TASK_PACK:
continue
cnt += 1

if task['track']['process']['ok']:
# break with enough check cnt
cnt = max(cnt, self.scheduler.UNPAUSE_CHECK_NUM)
Expand Down
43 changes: 31 additions & 12 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
# http://binux.me
# Created on 2014-02-08 22:37:13

import logging.config
import os
import time
import shutil
import time

import unittest2 as unittest
import logging
import logging.config

logging.config.fileConfig("pyspider/logging.conf")

from pyspider.scheduler.task_queue import TaskQueue
Expand Down Expand Up @@ -116,14 +117,17 @@ def setUpClass(self):

def get_taskdb():
return taskdb.TaskDB(self.taskdb_path)

self.taskdb = get_taskdb()

def get_projectdb():
return projectdb.ProjectDB(self.projectdb_path)

self.projectdb = get_projectdb()

def get_resultdb():
return resultdb.ResultDB(self.resultdb_path)

self.resultdb = get_resultdb()

self.newtask_queue = Queue(10)
Expand Down Expand Up @@ -210,9 +214,9 @@ def test_32_get_info(self):
'project': 'test_project',
'track': {
'save': {
}
}
})
}
})
# test_project on_get_info {}

def test_34_new_not_used_project(self):
Expand Down Expand Up @@ -329,9 +333,8 @@ def test_60_taskdone_failed_retry(self):
},
}
}) # task retry 0/3 test_project:taskid url
from six.moves import queue as Queue
# with self.assertRaises(Queue.Empty):
# task = self.scheduler2fetcher.get(timeout=4)
# task = self.scheduler2fetcher.get(timeout=4)
task = self.scheduler2fetcher.get(timeout=5) # select test_project:taskid url
self.assertIsNotNone(task)

Expand Down Expand Up @@ -510,19 +513,19 @@ def test_a20_failed_retry(self):

def test_a30_task_verify(self):
self.assertFalse(self.rpc.newtask({
#'taskid': 'taskid#',
# 'taskid': 'taskid#',
'project': 'test_project',
'url': 'url',
})) # taskid not in task: {'project': 'test_project', 'url': 'url'}
self.assertFalse(self.rpc.newtask({
'taskid': 'taskid#',
#'project': 'test_project',
# 'project': 'test_project',
'url': 'url',
})) # project not in task: {'url': 'url', 'taskid': 'taskid#'}
self.assertFalse(self.rpc.newtask({
'taskid': 'taskid#',
'project': 'test_project',
#'url': 'url',
# 'url': 'url',
})) # url not in task: {'project': 'test_project', 'taskid': 'taskid#'}
self.assertFalse(self.rpc.newtask({
'taskid': 'taskid#',
Expand Down Expand Up @@ -665,7 +668,7 @@ def test_38_cancel_task(self):
# task_queue = [ test_project:taskid_to_cancel ]

time.sleep(0.2)
self.assertEqual(self.rpc.size(), current_size+1)
self.assertEqual(self.rpc.size(), current_size + 1)

self.newtask_queue.put({
'taskid': 'taskid_to_cancel',
Expand Down Expand Up @@ -715,7 +718,7 @@ def test_x10_inqueue_limit(self):

def test_x20_delete_project(self):
self.assertIsNotNone(self.projectdb.get('test_inqueue_project'))
#self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
# self.assertIsNotNone(self.taskdb.get_task('test_inqueue_project', 'taskid1'))
self.projectdb.update('test_inqueue_project', status="STOP", group="lock,delete")
time.sleep(1)
self.assertIsNone(self.projectdb.get('test_inqueue_project'))
Expand All @@ -738,6 +741,7 @@ def test_z20_quit(self):

from pyspider.scheduler.scheduler import Project


class TestProject(unittest.TestCase):
task_pack = {
'type': Scheduler.TASK_PACK,
Expand Down Expand Up @@ -860,6 +864,21 @@ def test_pause_70_unpaused(self):
self.assertFalse(self.project.paused)
self.assertFalse(self.project._paused)

def test_pause_80_paused_again(self):
for i in range(self.scheduler.FAIL_PAUSE_NUM):
self.project.active_tasks.appendleft((time.time(), dict(self.status_fail_pack)))
self.assertTrue(self.project.paused)

def test_pause_90_unpause_checking(self):
time.sleep(3)
self.assertFalse(self.project.paused)

def test_pause_99_unpaused(self):
for i in range(self.scheduler.ACTIVE_TASKS):
self.project.active_tasks.appendleft((time.time(), dict(self.task_pack)))
self.assertFalse(self.project.paused)
self.assertFalse(self.project._paused)

def test_pause_x_disable_auto_pause(self):
fail_pause_num = self.scheduler.FAIL_PAUSE_NUM
self.scheduler.FAIL_PAUSE_NUM = 0
Expand Down