Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ScraperPipeline #214

Open
emrgnt-cmplxty opened this issue Mar 26, 2024 · 1 comment
Open

Add ScraperPipeline #214

emrgnt-cmplxty opened this issue Mar 26, 2024 · 1 comment

Comments

@emrgnt-cmplxty
Copy link
Contributor

emrgnt-cmplxty commented Mar 26, 2024

The task for this issue is to add a pipeline which facilitates scraping of provided URLs.

The class should confirm to typical pipeline format seen elsewhere in the codebase and should provide basic support for scraping target URLs.

@emrgnt-cmplxty
Copy link
Contributor Author

Example script to start from:

import json
import logging
import os

import fire
from scrapy.crawler import CrawlerProcess

from sciphi.database.content import (
    ShardContentWriter,
    ShardedContentReader,
    ShardedContentWriter,
)
from sciphi.database.search import ShardedSearchReader
from sciphi.database.utils import hash_to_shard
from sciphi.search.search_parser import (
    extract_answer_box,
    extract_knowledge_graph,
    extract_organic_results,
    extract_questions_and_answers,
    extract_related_questions,
)
from sciphi.search.splash_spider import SplashSpider

logger = logging.getLogger(__name__)

pdfminer_logger = logging.getLogger("pdfminer")
pdfminer_logger.setLevel(logging.WARNING)


def extract_all_urls_from_search_data(search_data):
    urls = []
    urls.extend(extract_related_questions(search_data))
    urls.extend(extract_answer_box(search_data))
    urls.extend(extract_knowledge_graph(search_data))
    urls.extend(extract_organic_results(search_data))
    urls.extend(extract_questions_and_answers(search_data))
    return urls


class Scraper:
    def __init__(
        self,
        root_path=None,
        scraped_db_rel_path="data/scraped_v2/",
        search_db_rel_path="data/search/",
        debug=False,
    ):
        if root_path is None:
            root_path = os.environ["HOME"]  # On Unix and Linux systems

        self.scraped_db_path = os.path.join(root_path, scraped_db_rel_path)
        self.search_db_path = os.path.join(root_path, search_db_rel_path)
        logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)

    def scrape(
        self,
        pipeline="sciphi.search.splash_spider.CollectAllPipeline",
        splash_url="http://localhost:8050",
        tag="v0",
        shard_index=0,
        limit=1_000_000,
        skip_proc_queries=True,
    ):
        logging.getLogger("scrapy").setLevel(logging.WARNING)

        with ShardedSearchReader(self.search_db_path) as search_db:
            all_queries = search_db.fetch_search_queries()
            logger.info(f"Loaded {len(all_queries)} queries to process.")

            # Create the scraped database
            with ShardedContentWriter(self.scraped_db_path) as scraped_db:
                scraped_db.create_table()

            with ShardedContentReader(self.scraped_db_path) as scraped_db:
                processed_queries = scraped_db.fetch_unique_queries()
                processed_urls = scraped_db.fetch_unique_urls()
            logger.info(f"Already processed {len(processed_queries)} queries.")

            with ShardContentWriter(
                self.scraped_db_path, shard_index
            ) as shard_db:
                queries_to_process = []
                for query in all_queries:
                    if query in processed_queries and skip_proc_queries:
                        continue
                    if (
                        hash_to_shard(query, shard_db.num_shards)
                        != shard_index
                    ):
                        continue
                    queries_to_process.append(query)
                    if len(queries_to_process) >= limit:
                        break

                logger.info(
                    f"Loaded {len(queries_to_process)} queries to process."
                )
                queries_to_results = search_db.fetch_searches_by_queries(
                    queries_to_process
                )
                if len(queries_to_results) != len(queries_to_process):
                    raise Exception(
                        "Mismatch in number of queries and searches"
                    )
                logger.info(
                    f"Loaded {len(queries_to_results)} searches to process."
                )

                urls_to_queries = {}
                for query, result in queries_to_results.items():
                    search_data = json.loads(result)
                    query_extraction = extract_all_urls_from_search_data(
                        search_data
                    )
                    for extraction in query_extraction:
                        link = extraction["link"]
                        if (
                            link != ""
                            and link not in urls_to_queries
                            # TODO - Implement redundancy check later
                            # and link not in scraped_urls
                        ):
                            if link in urls_to_queries:
                                logger.warning(
                                    f"Found duplicate link {link} for query {query}."
                                )
                                continue
                            if link in processed_urls:
                                logger.warning(
                                    f"Found processed link {link} for query {query}."
                                )
                                continue
                            urls_to_queries[link] = query

                logger.info(f"Found {len(urls_to_queries)} URLs to process.")

                USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"

                settings = {
                    "SPLASH_URL": splash_url,
                    "ITEM_PIPELINES": {
                        pipeline: 1,
                    },
                    "SCRAPED_DB_PATH": self.scraped_db_path,
                    "SHARD_INDEX": shard_index,
                    "AUTOTHROTTLE_ENABLED": True,
                    "DOWNLOAD_DELAY": 2,
                    "RETRY_ENABLED": True,
                    "RETRY_TIMES": 10,
                    "RETRY_HTTP_CODES": [
                        500,
                        502,
                        503,
                        504,
                        522,
                        524,
                        429,
                        403,
                    ],
                    "DOWNLOADER_MIDDLEWARES": {
                        "scrapy.downloadermiddlewares.retry.RetryMiddleware": None,
                        "sciphi.search.splash_spider.CustomRetryMiddleware": 550,
                    },
                    "COOKIES_ENABLED": True,
                    "CONCURRENT_REQUESTS": 4_096,
                    "LOG_LEVEL": "WARNING",
                    "AJAXCRAWL_ENABLED": True,
                    "USER_AGENT": USER_AGENT,
                }
                process = CrawlerProcess(settings)
                process.crawl(
                    SplashSpider,
                    urls=list(urls_to_queries.keys()),
                    tag=tag,
                    url_to_query_map=urls_to_queries,
                )
                process.start()

    def report(
        self,
    ):
        with ShardedContentReader(self.scraped_db_path) as db:
            unique_queries = db.fetch_unique_queries()
            logger.info(f"Found {len(unique_queries)} unique queries.")
            unique_urls = db.fetch_unique_urls()
            logger.info(f"Found {len(unique_urls)} unique urls.")


if __name__ == "__main__":
    fire.Fire(Scraper)

@emrgnt-cmplxty emrgnt-cmplxty changed the title Add ScrapePipeline Add ScraperPipeline Apr 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant