Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make RequestQueue v2 the default queue, see more on [Apify blog](https://blog.apify.com/new-apify-request-queue/) #2390

Merged
merged 13 commits into from
May 14, 2024
15 changes: 15 additions & 0 deletions docs/experiments/request_locking.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ description: Parallelize crawlers with ease using request locking

import ApiLink from '@site/src/components/ApiLink';

:::tip Release announcement

As of **May 2024** (`crawlee` version `3.10.0`), this experiment is now enabled by default! With that said, if you encounter issues you can:

- set `requestLocking` to `false` in the `experiments` object of your crawler options
- update all imports of `RequestQueue` to `RequestQueueV1`
- open an issue on our [GitHub repository](https://github.com/apify/crawlee)

The content below is kept for documentation purposes.
If you're interested in the changes, you can read the [blog post about the new Request Queue storage system on the Apify blog](https://blog.apify.com/new-apify-request-queue/).

:::

---

:::caution

This is an experimental feature. While we welcome testers, keep in mind that it is currently not recommended to use this in production.
Expand Down
26 changes: 9 additions & 17 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import {
mergeCookies,
NonRetryableError,
purgeDefaultStorages,
RequestQueueV1,
RequestQueue,
RequestQueueV2,
RequestState,
RetryRequestError,
Router,
Expand Down Expand Up @@ -356,8 +356,10 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
*/
export interface CrawlerExperiments {
/**
* Enables the use of the new RequestQueue API, which allows multiple clients to use the same queue,
* by locking the requests they are processing for a period of time.
* @deprecated This experiment is now enabled by default, and this flag will be removed in a future release.
* If you encounter issues due to this change, please:
* - report it to us: https://github.com/apify/crawlee
* - set `requestLocking` to `false` in the `experiments` option of the crawler
*/
requestLocking?: boolean;
}
Expand Down Expand Up @@ -592,14 +594,6 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.domainAccessedTime = new Map();
this.experiments = experiments;

if (requestQueue && requestQueue instanceof RequestQueueV2 && !experiments.requestLocking) {
throw new Error([
'You provided the new RequestQueue v2 class into your crawler without enabling the experiment!',
"If you're sure you want to test out the new experimental RequestQueue v2, please provide `experiments: { requestLocking: true }` "
+ 'in your crawler options, and try again.',
].join('\n'));
}

this._handlePropertyNameChange({
newName: 'requestHandler',
oldName: 'handleRequestFunction',
Expand Down Expand Up @@ -1567,16 +1561,14 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
}

private async _getRequestQueue() {
if (this.experiments.requestLocking) {
// Check if it's explicitly disabled
if (this.experiments.requestLocking === false) {
if (!this._experimentWarnings.requestLocking) {
this.log.warning([
'The RequestQueue v2 is an experimental feature, and may have issues when used in a production environment.',
'Please report any issues you encounter on GitHub: https://github.com/apify/crawlee',
].join('\n'));
this.log.info('Using the old RequestQueue implementation without request locking.');
this._experimentWarnings.requestLocking = true;
}

return RequestQueueV2.open(null, { config: this.config });
return RequestQueueV1.open(null, { config: this.config });
}

return RequestQueue.open(null, { config: this.config });
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ export * from './dataset';
export * from './key_value_store';
export * from './request_list';
export * from './request_provider';
export * from './request_queue';
export * from './request_queue_v2';
export { RequestQueueV1 } from './request_queue';
export { RequestQueue } from './request_queue_v2';
export { RequestQueue as RequestQueueV2 } from './request_queue_v2';
export * from './storage_manager';
export * from './utils';
export * from './access_checking';
17 changes: 17 additions & 0 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,23 @@ export abstract class RequestProvider implements IStorage {
return new Request(requestOptions as unknown as RequestOptions);
}

/**
* Returns a next request in the queue to be processed, or `null` if there are no more pending requests.
*
* Once you successfully finish processing of the request, you need to call
* {@apilink RequestQueue.markRequestHandled}
* to mark the request as handled in the queue. If there was some error in processing the request,
* call {@apilink RequestQueue.reclaimRequest} instead,
* so that the queue will give the request to some other consumer in another call to the `fetchNextRequest` function.
*
* Note that the `null` return value doesn't mean the queue processing finished,
* it means there are currently no pending requests.
* To check whether all requests in queue were finished,
* use {@apilink RequestQueue.isFinished} instead.
*
* @returns
* Returns the request object or `null` if there are no more pending requests.
*/
abstract fetchNextRequest<T extends Dictionary = Dictionary>(options?: RequestOptions): Promise<Request<T> | null>;

/**
Expand Down
28 changes: 27 additions & 1 deletion packages/core/src/storages/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ const RECENTLY_HANDLED_CACHE_SIZE = 1000;
* await queue.addRequest({ url: 'http://example.com/foo/bar' }, { forefront: true });
* ```
* @category Sources
*
* @deprecated RequestQueue v1 is deprecated and will be removed in the future. Please use {@apilink RequestQueue} instead.
*/
export class RequestQueue extends RequestProvider {
class RequestQueue extends RequestProvider {
private queryQueueHeadPromise?: Promise<{
wasLimitReached: boolean;
prevLimit: number;
Expand Down Expand Up @@ -327,6 +329,12 @@ export class RequestQueue extends RequestProvider {
return super.markRequestHandled(...args);
}

/**
* Reclaims a failed request back to the queue, so that it can be returned for processing later again
* by another call to {@apilink RequestQueue.fetchNextRequest}.
* The request record in the queue is updated using the provided `request` parameter.
* For example, this lets you store the number of retries or error messages for the request.
*/
override async reclaimRequest(...args: Parameters<RequestProvider['reclaimRequest']>) {
checkStorageAccess();

Expand Down Expand Up @@ -359,7 +367,25 @@ export class RequestQueue extends RequestProvider {
this.lastActivity = new Date();
}

/**
* Opens a request queue and returns a promise resolving to an instance
* of the {@apilink RequestQueue} class.
*
* {@apilink RequestQueue} represents a queue of URLs to crawl, which is stored either on local filesystem or in the cloud.
* The queue is used for deep crawling of websites, where you start with several URLs and then
* recursively follow links to other pages. The data structure supports both breadth-first
* and depth-first crawling orders.
*
* For more details and code examples, see the {@apilink RequestQueue} class.
*
* @param [queueIdOrName]
* ID or name of the request queue to be opened. If `null` or `undefined`,
* the function returns the default request queue associated with the crawler run.
* @param [options] Open Request Queue options.
*/
static override async open(...args: Parameters<typeof RequestProvider.open>): Promise<RequestQueue> {
return super.open(...args) as Promise<RequestQueue>;
}
}

export { RequestQueue as RequestQueueV1 };
60 changes: 42 additions & 18 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,41 @@ const MAX_CACHED_REQUESTS = 2_000_000;
*/
const RECENTLY_HANDLED_CACHE_SIZE = 1000;

class RequestQueue extends RequestProvider {
/**
* Represents a queue of URLs to crawl, which is used for deep crawling of websites
* where you start with several URLs and then recursively
* follow links to other pages. The data structure supports both breadth-first and depth-first crawling orders.
*
* Each URL is represented using an instance of the {@apilink Request} class.
* The queue can only contain unique URLs. More precisely, it can only contain {@apilink Request} instances
* with distinct `uniqueKey` properties. By default, `uniqueKey` is generated from the URL, but it can also be overridden.
* To add a single URL multiple times to the queue,
* corresponding {@apilink Request} objects will need to have different `uniqueKey` properties.
*
* Do not instantiate this class directly, use the {@apilink RequestQueue.open} function instead.
*
* `RequestQueue` is used by {@apilink BasicCrawler}, {@apilink CheerioCrawler}, {@apilink PuppeteerCrawler}
* and {@apilink PlaywrightCrawler} as a source of URLs to crawl.
* Unlike {@apilink RequestList}, `RequestQueue` supports dynamic adding and removing of requests.
* On the other hand, the queue is not optimized for operations that add or remove a large number of URLs in a batch.
*
* **Example usage:**
*
* ```javascript
* // Open the default request queue associated with the crawler run
* const queue = await RequestQueue.open();
*
* // Open a named request queue
* const queueWithName = await RequestQueue.open('some-name');
*
* // Enqueue few requests
* await queue.addRequest({ url: 'http://example.com/aaa' });
* await queue.addRequest({ url: 'http://example.com/bbb' });
* await queue.addRequest({ url: 'http://example.com/foo/bar' }, { forefront: true });
* ```
* @category Sources
*/
export class RequestQueue extends RequestProvider {
private _listHeadAndLockPromise: Promise<void> | null = null;

constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) {
Expand Down Expand Up @@ -63,21 +97,7 @@ class RequestQueue extends RequestProvider {
}

/**
* Returns a next request in the queue to be processed, or `null` if there are no more pending requests.
*
* Once you successfully finish processing of the request, you need to call
* {@apilink RequestQueue.markRequestHandled}
* to mark the request as handled in the queue. If there was some error in processing the request,
* call {@apilink RequestQueue.reclaimRequest} instead,
* so that the queue will give the request to some other consumer in another call to the `fetchNextRequest` function.
*
* Note that the `null` return value doesn't mean the queue processing finished,
* it means there are currently no pending requests.
* To check whether all requests in queue were finished,
* use {@apilink RequestQueue.isFinished} instead.
*
* @returns
* Returns the request object or `null` if there are no more pending requests.
* @inheritDoc
*/
override async fetchNextRequest<T extends Dictionary = Dictionary>(): Promise<Request<T> | null> {
checkStorageAccess();
Expand Down Expand Up @@ -143,6 +163,9 @@ class RequestQueue extends RequestProvider {
return request;
}

/**
* @inheritDoc
*/
override async reclaimRequest(...args: Parameters<RequestProvider['reclaimRequest']>): ReturnType<RequestProvider['reclaimRequest']> {
checkStorageAccess();

Expand Down Expand Up @@ -350,9 +373,10 @@ class RequestQueue extends RequestProvider {
}
}

/**
* @inheritDoc
*/
static override async open(...args: Parameters<typeof RequestProvider.open>): Promise<RequestQueue> {
return super.open(...args) as Promise<RequestQueue>;
}
}

export { RequestQueue as RequestQueueV2 };
B4nan marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion test/core/storages/request_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
QUERY_HEAD_MIN_LENGTH,
API_PROCESSED_REQUESTS_DELAY_MILLIS,
STORAGE_CONSISTENCY_DELAY_MILLIS,
RequestQueue,
RequestQueueV1 as RequestQueue,
Request,
Configuration,
ProxyConfiguration,
Expand Down