Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

分布式redis爬虫缓存清理支持 #892

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

qingmo
Copy link

@qingmo qingmo commented Sep 25, 2019

鉴于目前webmagic主要参考与scrapy,我找到了scrapy的redis分布式方案scrapy-redis.
通过对比当前webmagic源码与scrapy-redis源码发现,RedisScheduler中缺少对于queue的清理。
以scrapy-redis中的源码为例
scrapy-redis/src/scrapy-redis/scheduler.py

...
class Scheduler(object):
...
def open(self, spider):
...
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

def close(self, reason):
if not self.persist:
self.flush()

def flush(self):
self.df.clear()
self.queue.clear()
scrapy-redis/src/scrapy_redis/queue.py

class Base(object):
...
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
...
scrapy-redis/src/scrapy_redis/dupefilter.py

class RFPDupeFilter(BaseDupeFilter):
...
def close(self, reason=''):
"""Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"""
self.clear()

def clear(self):
    """Clears fingerprints data."""
    self.server.delete(self.key)
...

个人对python不熟悉,简单理解了一下,flush_on_start可以设置为启动的时候对当前实例执行flush清理。或者使用者可以主动调用flush()根据自己的情况进行清理
这里的清理是对key的完整清理。

反观Webmagic这边
webmagic/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java

public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {

private static final String QUEUE_PREFIX = "queue_";

private static final String SET_PREFIX = "set_";

private static final String ITEM_PREFIX = "item_";

...
@Override
public void resetDuplicateCheck(Task task) {
    Jedis jedis = pool.getResource();
    try {
        jedis.del(getSetKey(task));
    } finally {
        pool.returnResource(jedis);
    }
}
...
protected String getSetKey(Task task) {
    return SET_PREFIX + task.getUUID();
}

protected String getQueueKey(Task task) {
    return QUEUE_PREFIX + task.getUUID();
}

protected String getItemKey(Task task)
{
    return ITEM_PREFIX + task.getUUID();
}
...

这里只有DuplicateRemover接口中定义了一个resetDuplicateCheck方法对set_这个key进行了清理动作。
场景举例:目前有多台爬虫机器,一个redis服务。如果这个爬虫集群处理完第一批数据后,理论上来说第
二批数据属于主观需要抓取的,无论是与第一批数据是否重复。那么这里就需要清理之前的queue。
但是实际上的效果是,目前只能调用resetDuplicateCheck进行排除重复。但是随着数据量的持续上升
item_TASKID的队列一直没有得到清理。如下图所示:
redis内存使用(图太久了已裂)
实际情况(图太久了已裂)
其中set_可以得到清理。

add flush function to clean redis data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant