New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ScraperPipeline #214
Comments
Example script to start from: import json
import logging
import os
import fire
from scrapy.crawler import CrawlerProcess
from sciphi.database.content import (
ShardContentWriter,
ShardedContentReader,
ShardedContentWriter,
)
from sciphi.database.search import ShardedSearchReader
from sciphi.database.utils import hash_to_shard
from sciphi.search.search_parser import (
extract_answer_box,
extract_knowledge_graph,
extract_organic_results,
extract_questions_and_answers,
extract_related_questions,
)
from sciphi.search.splash_spider import SplashSpider
logger = logging.getLogger(__name__)
pdfminer_logger = logging.getLogger("pdfminer")
pdfminer_logger.setLevel(logging.WARNING)
def extract_all_urls_from_search_data(search_data):
urls = []
urls.extend(extract_related_questions(search_data))
urls.extend(extract_answer_box(search_data))
urls.extend(extract_knowledge_graph(search_data))
urls.extend(extract_organic_results(search_data))
urls.extend(extract_questions_and_answers(search_data))
return urls
class Scraper:
def __init__(
self,
root_path=None,
scraped_db_rel_path="data/scraped_v2/",
search_db_rel_path="data/search/",
debug=False,
):
if root_path is None:
root_path = os.environ["HOME"] # On Unix and Linux systems
self.scraped_db_path = os.path.join(root_path, scraped_db_rel_path)
self.search_db_path = os.path.join(root_path, search_db_rel_path)
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
def scrape(
self,
pipeline="sciphi.search.splash_spider.CollectAllPipeline",
splash_url="http://localhost:8050",
tag="v0",
shard_index=0,
limit=1_000_000,
skip_proc_queries=True,
):
logging.getLogger("scrapy").setLevel(logging.WARNING)
with ShardedSearchReader(self.search_db_path) as search_db:
all_queries = search_db.fetch_search_queries()
logger.info(f"Loaded {len(all_queries)} queries to process.")
# Create the scraped database
with ShardedContentWriter(self.scraped_db_path) as scraped_db:
scraped_db.create_table()
with ShardedContentReader(self.scraped_db_path) as scraped_db:
processed_queries = scraped_db.fetch_unique_queries()
processed_urls = scraped_db.fetch_unique_urls()
logger.info(f"Already processed {len(processed_queries)} queries.")
with ShardContentWriter(
self.scraped_db_path, shard_index
) as shard_db:
queries_to_process = []
for query in all_queries:
if query in processed_queries and skip_proc_queries:
continue
if (
hash_to_shard(query, shard_db.num_shards)
!= shard_index
):
continue
queries_to_process.append(query)
if len(queries_to_process) >= limit:
break
logger.info(
f"Loaded {len(queries_to_process)} queries to process."
)
queries_to_results = search_db.fetch_searches_by_queries(
queries_to_process
)
if len(queries_to_results) != len(queries_to_process):
raise Exception(
"Mismatch in number of queries and searches"
)
logger.info(
f"Loaded {len(queries_to_results)} searches to process."
)
urls_to_queries = {}
for query, result in queries_to_results.items():
search_data = json.loads(result)
query_extraction = extract_all_urls_from_search_data(
search_data
)
for extraction in query_extraction:
link = extraction["link"]
if (
link != ""
and link not in urls_to_queries
# TODO - Implement redundancy check later
# and link not in scraped_urls
):
if link in urls_to_queries:
logger.warning(
f"Found duplicate link {link} for query {query}."
)
continue
if link in processed_urls:
logger.warning(
f"Found processed link {link} for query {query}."
)
continue
urls_to_queries[link] = query
logger.info(f"Found {len(urls_to_queries)} URLs to process.")
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
settings = {
"SPLASH_URL": splash_url,
"ITEM_PIPELINES": {
pipeline: 1,
},
"SCRAPED_DB_PATH": self.scraped_db_path,
"SHARD_INDEX": shard_index,
"AUTOTHROTTLE_ENABLED": True,
"DOWNLOAD_DELAY": 2,
"RETRY_ENABLED": True,
"RETRY_TIMES": 10,
"RETRY_HTTP_CODES": [
500,
502,
503,
504,
522,
524,
429,
403,
],
"DOWNLOADER_MIDDLEWARES": {
"scrapy.downloadermiddlewares.retry.RetryMiddleware": None,
"sciphi.search.splash_spider.CustomRetryMiddleware": 550,
},
"COOKIES_ENABLED": True,
"CONCURRENT_REQUESTS": 4_096,
"LOG_LEVEL": "WARNING",
"AJAXCRAWL_ENABLED": True,
"USER_AGENT": USER_AGENT,
}
process = CrawlerProcess(settings)
process.crawl(
SplashSpider,
urls=list(urls_to_queries.keys()),
tag=tag,
url_to_query_map=urls_to_queries,
)
process.start()
def report(
self,
):
with ShardedContentReader(self.scraped_db_path) as db:
unique_queries = db.fetch_unique_queries()
logger.info(f"Found {len(unique_queries)} unique queries.")
unique_urls = db.fetch_unique_urls()
logger.info(f"Found {len(unique_urls)} unique urls.")
if __name__ == "__main__":
fire.Fire(Scraper) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The task for this issue is to add a pipeline which facilitates scraping of provided URLs.
The class should confirm to typical pipeline format seen elsewhere in the codebase and should provide basic support for scraping target URLs.
The text was updated successfully, but these errors were encountered: