Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Pipelines to be run/reused in "SuperPipelines" #7638

Open
mikebellerU opened this issue May 2, 2024 · 5 comments
Open

Allow Pipelines to be run/reused in "SuperPipelines" #7638

mikebellerU opened this issue May 2, 2024 · 5 comments
Assignees
Labels
P2 Medium priority, add to the next sprint if no P1 available

Comments

@mikebellerU
Copy link

Hi -- I have been using haystack to build out some complicated RAG pipelines. They are too complicated to build in a single Pipeline. I would like to be able to "compose" sub-pipelines together. This will allow for building complex pipelines
from smaller ones, and would also allow for reuse of smaller pipelines in various ways.

Here is a trivial example of what I'd like to be able to do. In a real use case the subpipelines p1, p2 would
course be larger and more complicated, and do something useful!

from haystack import Pipeline
from haystack.components.others.pipeline import PipelineComponent
from haystack.components.converters import OutputAdapter

p1 = Pipeline()
p1.add_component("adap", OutputAdapter(
    template="Hello {{inp}}", output_type=str))
p2 = Pipeline()
p2.add_component("adap", OutputAdapter(
    template="Goodbye {{inp}}", output_type=str))
p = Pipeline()
p.add_component("pipeline1", PipelineComponent(p1))
p.add_component("pipeline2", PipelineComponent(p2))
p.connect("pipeline1.adap:output", "pipeline2.adap:inp")
print(p.run(data={"pipeline1": {"adap:inp": "Paris"}}))

Notes:

  • The PipelineComponent idea is from a discord discussion with @masci -- Here is his [POC branch: ]
    (8e455f6)
  • The branch doesn't run anymore for reasons discussed in the discord.
  • The naming used (pipeline.adap:output) to address the hierarchy of data is just
    one idea, proposed by @masci, but seems a reasonable choice

Alternatives Considered:

  • Tried just making bigger and bigger pipelines. They become unwieldy and difficult to test.
  • Tried creating my own PipelineComponent, but it becomes hard to move the data around,
    because each level of Pipelines in the hierarchy adds a {'data': {...}} wrapper. Soon my
    brain melts.

Additional context

Some amazing things this could enable: What if we had a ParallelPipelineComponent that can run
multiple copies of the same Pipeline in parallel using a ThreadPoolExecutor or using Dask/Ray/something!
It would be fairly easy to do I think once we had PipelineComponent.

@masci
Copy link
Member

masci commented May 3, 2024

@vblagoje for visibility

@Redna
Copy link

Redna commented May 14, 2024

I did an implementation for the ParallelPipelineComponent some time ago. Maybe you can reuse something here.

https://github.com/Redna/haystack-extensions/blob/main/src/haystack_extensions/components/concurrent_runner/runner.py

@component
class ConcurrentPipelineRunner:
    """
    This component allows you to run multiple pipelines concurrently in a thread pool.
    """

    def __init__(self, named_pipelines: List[NamedPipeline], executor: Optional[ThreadPoolExecutor | None] = None):
        if type(named_pipelines) != list or any(
            [type(named_pipeline) != NamedPipeline for named_pipeline in named_pipelines]
        ):
            raise ValueError("named_pipelines must be a list of NamedPipeline instances")

        names = [named_pipeline.name for named_pipeline in named_pipelines]
        if len(names) != len(set(names)):
            raise ValueError("All components must have unique names")

        for named_pipeline in named_pipelines:
            component.set_input_type(self, named_pipeline.name, {named_pipeline.name: Dict[str, Any]})

        output_types = {}
        for named_pipeline in named_pipelines:
            output_types[named_pipeline.name] = Dict[str, Any]
        self.pipelines = named_pipelines
        self.executor = executor

    def run(self, **inputs):
        if self.executor is None:
            with ThreadPoolExecutor() as executor:
                final_results = self._run_in_executor(executor, inputs)
        else:
            final_results = self._run_in_executor(self.executor, inputs)

        return {named_pipeline.name: result for named_pipeline, result in zip(self.pipelines, final_results)}

    def _run_in_executor(self, executor: ThreadPoolExecutor, inputs: Dict[str, Any]):
        results = executor.map(lambda c: c[0].pipeline.run(data=inputs[c[1]]), zip(self.pipelines, inputs.keys()))
        return [result for result in results]

https://github.com/Redna/haystack-extensions/blob/main/tests/test_runner.py

def test_concurrent_pipeline_runner(self):
        component_call_stack = []

        def callback(component):
            component_call_stack.append(component)

        simple_component_1 = SimpleComponent(wait_time=0.09, callback=callback)
        pipeline1 = Pipeline()
        pipeline1.add_component("simple_component", simple_component_1)

        simple_component_2 = SimpleComponent(wait_time=0.02, callback=callback)
        pipeline2 = Pipeline()
        pipeline2.add_component("simple_component", simple_component_2)

        concurrent_pipeline_runner = ConcurrentPipelineRunner(
            [NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)]
        )

        overall_pipeline = Pipeline()

        overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)

        results = overall_pipeline.run(
            data={
                "concurrent_pipeline_runner": {
                    "pipeline1": {"simple_component": {"increment": 1}},
                    "pipeline2": {"simple_component": {"increment": 2, "number": 10}},
                }
            }
        )

        assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }
        assert len(component_call_stack) == 2
        assert component_call_stack[0] == simple_component_2
        assert component_call_stack[1] == simple_component_1

@mikebellerU
Copy link
Author

thanks @Redna -- it's an interesting solution. One of the problems I was struggling with is addressed here -- the setting of input and output types. Another issue though that I still struggle with -- is whether the data management part is getting quite complicated. The fact that the output looks like this:

       assert results == {
            'concurrent_pipeline_runner': {
                'pipeline1': {'simple_component': {'number': 6}},
                'pipeline2': {'simple_component': {'number': 12}},
            }
        }

Someone trying to get results from these many pipelines has to have a lot of knowledge about the internals of the pipelines. Do you know of any guidance on a "good" way to build reusable pipelines with Haystack? Reusable in the sense that they are flexible, but where the user of the reusable pipeline doesn't have to know all the details of the internal components of the pipeline in order to run it.

@vblagoje
Copy link
Member

Hey @mikebellerU and @Redna , before jumping onto some of these great ideas about pipeline executors, let's focus on making https://github.com/deepset-ai/haystack/compare/massi/pipeline-component actually work, the main "issue" remaining to be (de)serialization of these super components so they can be saved/loaded/reused/shared perhaps not only by yourself but within community. What do you think about that? I'm working on some other items right now but would love to contribute in the coming weeks.

@mikebellerU
Copy link
Author

I tried playing around with the @masci component (I tweaked it so it would work at least for my case). And here is what I learned: Quickly it all gets too hard to manage the levels of data input and output. To invoke the "parent" pipeline, you may have to understand the detailed 'run' signature of the"child" pipeline.

Right now, to invoke a typical Haystack 2.0 RAG pipeline, I have to write something like response = pipeline.run(data={'retriever':... , 'embedder': .... , 'llm':....}), and then when it returns I have to pick out response['answer_builder']['answers'][0].data to get the result I'm interested in. Wouldn't it be better if there was a way I could encapsulate the knowledge about running this pipeline into a runner method (or some other name), with a signature like: answer = rag_pipeline.runner(query=..,docstore=...) ? This method could live alongside run potentially as a wrapper for it, but would allow for a type-and-parameter-checked reusable pipeline, that abstracts its internal details.

TLDR: Solving composability of pipelines I think needs some thought about how to abstract away the underlying details, and that should factor into the design of PipelineRunner.

@masci masci added the P2 Medium priority, add to the next sprint if no P1 available label May 21, 2024
@julian-risch julian-risch self-assigned this May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P2 Medium priority, add to the next sprint if no P1 available
Projects
None yet
Development

No branches or pull requests

5 participants