Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace multiprocessing.ProcessPoolExecutor with asyncio #464

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Alcolo47
Copy link
Contributor

The main idea is this PR is to remove pending = list(pending_work) with asyncio version.
This allows starting some jobs without waiting for all work items fetch ware done from database.

@@ -74,6 +74,5 @@ def run_tests(command, timeout=None):
asyncio.set_event_loop_policy(
asyncio.WindowsProactorEventLoopPolicy())

result = asyncio.get_event_loop().run_until_complete(
_run_tests(command, timeout))
result = asyncio.run(_run_tests(command, timeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my reading of the docs, using run instead of get_event_loop().run_until_complete() seems quite limiting. run doesn't allow other event loops to be running on the thread. This is probably not an issue for us currently, but I could easily imagine CR using asyncio more in the future, so using run() here feels like we're asking for trouble in the future for little or no short-term gain. Is there something I'm missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to follow up on this, the docs for run say:

This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.

If nothing else, you're calling it in two places in this PR. This feels like something we need to address.

initargs=(config,)) as pool:

# pylint: disable=W0511
# TODO: This is not optimal. The pending-work iterable could be millions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this comment, or some version of it. We have a different version of this same problem in your implementation: we build a list of the size of the number of pending work items, something I'd eventually like to avoid. Until we fix this, I'd like to keep a reminder around to think about it.

on_task_complete(*result)

tasks = [run_task(work_item) for work_item in pending_work]
await asyncio.gather(*tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about memory usage here. tasks is a list of the same size as the number of items in pending_work, potentially millions or even billions of items. When we destructure it on the call to asyncio.gather(), Python is going to create a complete copy of it into a tuple, essentially doubling the space we need for this function.

Is there some way we can tell asyncio to process everything lazily and report when they're all done?

work_item)
on_task_complete(*result)

tasks = [run_task(work_item) for work_item in pending_work]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just pool.map()? The docs make some claims about map() achieving better performance by chunking the input; we should investigate that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have replaced pending list against tasks list in memory. Where is the best ?

I think there are others problems:

  • sqlite is not under asynio then pending_work generator may blocks all async jobs.
  • Dead lock can occurs if we need to write result in db since pending_work db cursor is not ended.

@@ -97,21 +101,23 @@ class LocalExecutionEngine(ExecutionEngine):
"The local-git execution engine."

def __call__(self, pending_work, config, on_task_complete):
with multiprocessing.Pool(
asyncio.run(self._execute_pending_works(pending_work, config, on_task_complete))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments about asyncio.run below.

@abingham
Copy link
Contributor

abingham commented Oct 2, 2019

It sounds like we're trying to solve two problems at once now. The first problem is to start processing jobs before all work-items were fetched from the database. The second is to avoid needing to have large memory allocations (e.g. the pending and tasks lists).

You mention an interesting point: sqlite is not using asyncio. Perhaps we can address both of these issues by using a database with an asyncio interface. Perhaps even aiosqlite. I would be very interested to see a solution like that, esp. if it can solve both of these problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants