Skip to content

Flow

lazyllm.flow.FlowBase

Bases: SessionConfigableBase

Base class for constructing flow-like structures that can hold multiple items and organize them hierarchically.

This class allows combining different objects (including FlowBase instances or other types) into a structured flow, with optional names for each item, enabling both name-based and index-based access. Items in the structure can be added or traversed dynamically.

Parameters:

  • *items

    Items to be included in the flow, which can be instances of FlowBase or other objects.

  • item_names (list of str, default: None ) –

    A list of names corresponding to the items, paired with items in order. If not provided, all items will be assigned None as their name.

  • auto_capture (bool, default: False ) –

    Whether to enable automatic variable capture. If True, when used as a context manager, newly defined variables in the current scope will be automatically added to the flow. Defaults to False.

Source code in lazyllm/flow/flow.py
class FlowBase(SessionConfigableBase, metaclass=_MetaBind):
    """Base class for constructing flow-like structures that can hold multiple items and organize them hierarchically.

This class allows combining different objects (including ``FlowBase`` instances or other types)
into a structured flow, with optional names for each item, enabling both name-based and index-based access.
Items in the structure can be added or traversed dynamically.

Args:
    *items: Items to be included in the flow, which can be instances of ``FlowBase`` or other objects.
    item_names (list of str, optional): A list of names corresponding to the items, paired with ``items`` in order.
        If not provided, all items will be assigned ``None`` as their name.
    auto_capture (bool, optional): Whether to enable automatic variable capture. If ``True``, when used
        as a context manager, newly defined variables in the current scope will be automatically added to the flow.
        Defaults to ``False``.
"""
    def __init__(self, *items, item_names=None, auto_capture=False, id: Optional[str] = None, name: Optional[str] = None,
                 group_id: Optional[str] = None) -> None:
        super().__init__(id=id, name=name, group_id=group_id)
        self._father = None
        self._items, self._item_names, self._item_ids, self._item_pos = [], [], [], []
        self._auto_capture = auto_capture
        self._capture = True
        self._curr_frame = None
        self._find_user_instantiation_frame()

        for k, v in zip(item_names if item_names else repeat(None), items):
            self._add(k, v)

        self._capture = False
        self._auto_registered = False

    def __post_init__(self): pass

    @property
    def _flow_id(self):
        return self._config_id

    def _make_step_item(self, v):
        if isinstance(v, type):
            return v()
        if isinstance(v, bind) and v._wraps_plain_callable():
            register_hooks(v, resolve_builtin_hooks(v))
            return v
        if _is_function(v) or v in self._items:
            return _FuncWrap(v)
        return v

    def _add(self, k, v):
        assert self._capture, f'_add can only be used in `{self.__class__}.__init__` or `with {self.__class__}()`'
        item = self._make_step_item(v)
        self._items.append(item)
        self._item_ids.append(k or str(uuid.uuid4().hex))
        self._item_pos.append(_get_callsite(depth=3))
        if isinstance(item, FlowBase): item._father = self
        if k:
            assert k not in self._item_names, f'Duplicated names {k}'
            self._item_names.append(k)
        if self._curr_frame and isinstance(item, FlowBase) and k and k not in self._curr_frame.f_locals:
            self._curr_frame.f_locals[k] = item  # make sense only when locals is globals

    def __enter__(self, __frame=None):
        assert len(self._items) == 0, f'Cannot init {self.__class__} with items if you want to use it by context.'
        self._curr_frame = __frame if __frame else inspect.currentframe().f_back
        if self._auto_capture:
            self._frame_keys = list(self._curr_frame.f_locals.keys())
        self._capture = True
        _get_flow_stack().append(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._auto_capture:
            locals = self._curr_frame.f_locals.copy()
            for var, val in locals.items():
                if var != 'self' and var not in self._frame_keys and (
                        (val._f if isinstance(val, bind) else val) is not self):
                    self._add(var, val)
        assert _get_flow_stack().pop() == self, 'Do not operate FLow.stack manually!'
        self._capture = False
        self._curr_frame = None
        self.__post_init__()
        return False

    def __iter__(self):
        # used to support `with pipeline() as (a.b, b):`
        return iter([self, self])

    def _find_user_instantiation_frame(self):
        try:
            for fm in inspect.stack():
                if fm.frame.f_globals.get('__name__', '').startswith('lazyllm.flow') or fm.filename.startswith('<'):
                    continue
                self._defined_file = os.path.abspath(fm.frame.f_code.co_filename)
                self._defined_func = fm.frame.f_code.co_name
                self._defined_pos = f'"file: {self._defined_file}", line {fm.frame.f_lineno}({self._defined_func})'
                break
        except Exception:
            self._defined_file, self._defined_pos, self._defined_func = None, None, None

    def _defined_at_the_same_scope(self, other: 'FlowBase'):
        return self._defined_file == other._defined_file and self._defined_func == other._defined_func

    def __setattr__(self, name: str, value):
        if '_capture' in self.__dict__ and self._capture and not name.startswith('_'):
            if hasattr(value, '_module_id') and hasattr(value, 'name') and value.name is None:
                value.name = name
            if len(_get_flow_stack()) > 1 and not self._auto_registered:
                super(__class__, self).__setattr__('_auto_registered', True)
                frame_locals = self._curr_frame.f_locals.copy()
                for key, item in frame_locals.items():
                    if item == self and (parent := _get_flow_stack()[-2]) != item:
                        if key not in parent._item_names and parent._defined_at_the_same_scope(self):
                            parent._add(key, self)
                        break
            assert name not in self._item_names, f'Duplicated name: {name}'
            self._add(name, value)
        elif name in getattr(self, '_item_names', ()):
            raise RuntimeError(f'The setting of {self.__class__} elements must be done within the `with` statement.')
        else:
            super(__class__, self).__setattr__(name, value)

    def __getattr__(self, name):
        if '_item_names' in self.__dict__ and name in self._item_names:
            return self._items[self._item_names.index(name)]
        raise AttributeError(f'{self.__class__} object has no attribute {name}')

    def id(self, module=None):
        """Get the identifier for a module or the flow itself. If a string is provided, it is returned as-is. If a bound module is provided, returns its associated item_id. If no argument is given, returns the unique id of the entire flow.

Args:
    module (Optional[Union[str, Any]]): Target module or string identifier.

**Returns:**

- str: Corresponding identifier string.
"""
        if isinstance(module, str): return module
        return self._item_ids[self._items.index(module)] if module else self._flow_id

    @property
    def is_root(self):
        """A property that indicates whether the current flow item is the root of the flow structure.

**Returns:**

- bool: True if the current item has no parent (`` _father`` is None), otherwise False.


Examples:
    >>> import lazyllm
    >>> p = lazyllm.pipeline()
    >>> p.is_root
    True
    >>> p2 = lazyllm.pipeline(p)
    >>> p.is_root
    False
    >>> p2.is_root
    True
    """
        return self._father is None

    @property
    def ancestor(self):
        """A property that returns the topmost ancestor of the current flow item.

If the current item is the root, it returns itself.

**Returns:**

- FlowBase: The topmost ancestor flow item.


Examples:
    >>> import lazyllm
    >>> p = lazyllm.pipeline()
    >>> p2 = lazyllm.pipeline(p)
    >>> p.ancestor is p2
    True
    """
        if self.is_root: return self
        return self._father.ancestor

    def for_each(self, filter, action):
        """Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Args:
    filter (callable): A function that takes an item as input and returns True if the item should have the action applied.
    action (callable): A function that takes an item as input and performs some operation on it.

**Returns:**

- None


Examples:
    >>> import lazyllm
    >>> def test1(): print('1')
    ... 
    >>> def test2(): print('2')
    ... 
    >>> def test3(): print('3')
    ... 
    >>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
    >>> flow.for_each(lambda x: callable(x), lambda x: print(x))
    <Function type=test1>
    <Function type=test2>
    <Function type=test3>
    """
        for item in self._items:
            if isinstance(item, FlowBase):
                item.for_each(filter, action)
            elif filter(item):
                action(item)

ancestor property

A property that returns the topmost ancestor of the current flow item.

If the current item is the root, it returns itself.

Returns:

  • FlowBase: The topmost ancestor flow item.

Examples:

>>> import lazyllm
>>> p = lazyllm.pipeline()
>>> p2 = lazyllm.pipeline(p)
>>> p.ancestor is p2
True

is_root property

A property that indicates whether the current flow item is the root of the flow structure.

Returns:

  • bool: True if the current item has no parent (_father is None), otherwise False.

Examples:

>>> import lazyllm
>>> p = lazyllm.pipeline()
>>> p.is_root
True
>>> p2 = lazyllm.pipeline(p)
>>> p.is_root
False
>>> p2.is_root
True

for_each(filter, action)

Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Parameters:

  • filter (callable) –

    A function that takes an item as input and returns True if the item should have the action applied.

  • action (callable) –

    A function that takes an item as input and performs some operation on it.

Returns:

  • None

Examples:

>>> import lazyllm
>>> def test1(): print('1')
... 
>>> def test2(): print('2')
... 
>>> def test3(): print('3')
... 
>>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
>>> flow.for_each(lambda x: callable(x), lambda x: print(x))
<Function type=test1>
<Function type=test2>
<Function type=test3>
Source code in lazyllm/flow/flow.py
    def for_each(self, filter, action):
        """Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Args:
    filter (callable): A function that takes an item as input and returns True if the item should have the action applied.
    action (callable): A function that takes an item as input and performs some operation on it.

**Returns:**

- None


Examples:
    >>> import lazyllm
    >>> def test1(): print('1')
    ... 
    >>> def test2(): print('2')
    ... 
    >>> def test3(): print('3')
    ... 
    >>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
    >>> flow.for_each(lambda x: callable(x), lambda x: print(x))
    <Function type=test1>
    <Function type=test2>
    <Function type=test3>
    """
        for item in self._items:
            if isinstance(item, FlowBase):
                item.for_each(filter, action)
            elif filter(item):
                action(item)

id(module=None)

Get the identifier for a module or the flow itself. If a string is provided, it is returned as-is. If a bound module is provided, returns its associated item_id. If no argument is given, returns the unique id of the entire flow.

Parameters:

  • module (Optional[Union[str, Any]], default: None ) –

    Target module or string identifier.

Returns:

  • str: Corresponding identifier string.
Source code in lazyllm/flow/flow.py
    def id(self, module=None):
        """Get the identifier for a module or the flow itself. If a string is provided, it is returned as-is. If a bound module is provided, returns its associated item_id. If no argument is given, returns the unique id of the entire flow.

Args:
    module (Optional[Union[str, Any]]): Target module or string identifier.

**Returns:**

- str: Corresponding identifier string.
"""
        if isinstance(module, str): return module
        return self._item_ids[self._items.index(module)] if module else self._flow_id

lazyllm.flow.LazyLLMFlowsBase

Bases: FlowBase

A base class for flow structures with hook support and unified execution logic.

LazyLLMFlowsBase is the base class for all LazyLLM flow types. It organizes a sequence of callable modules into a flow and provides support for pre/post hooks, synchronization control, post-processing, and error-safe invocation. It is not intended for direct use but instead serves as a foundational class for concrete flow types like Pipeline, Parallel, etc.

input --> [Flow module1 -> Flow module2 -> ... -> Flow moduleN] --> output
                   ↑             ↓
               pre_hook       post_hook

Parameters:

  • args

    A sequence of callables representing the flow modules.

  • post_action

    An optional callable applied to the output after main flow execution. Defaults to None

  • auto_capture

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • **kw

    Key-value pairs for named components.

Source code in lazyllm/flow/flow.py
class LazyLLMFlowsBase(FlowBase, metaclass=LazyLLMRegisterMetaClass):
    """A base class for flow structures with hook support and unified execution logic.

`LazyLLMFlowsBase` is the base class for all LazyLLM flow types. It organizes a sequence of callable modules into a flow and provides support for pre/post hooks, synchronization control, post-processing, and error-safe invocation. It is not intended for direct use but instead serves as a foundational class for concrete flow types like `Pipeline`, `Parallel`, etc.

```text
input --> [Flow module1 -> Flow module2 -> ... -> Flow moduleN] --> output
                   ↑             ↓
               pre_hook       post_hook
```

Args:
    args: A sequence of callables representing the flow modules.
    post_action: An optional callable applied to the output after main flow execution. Defaults to ``None``。
    auto_capture: If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    **kw: Key-value pairs for named components.
"""
    def __init__(self, *args, post_action=None, auto_capture=False, id: Optional[str] = None,
                 name: Optional[str] = None, group_id: Optional[str] = None, **kw):
        assert len(args) == 0 or len(kw) == 0, f'Cannot provide args `{args}` and kwargs `{kw}` at the same time'
        if len(args) > 0 and isinstance(args[0], (tuple, list)):
            assert len(args) == 1, 'args should be list of callable functions'
            args = args[0]
        args = list(args) + [v() if isinstance(v, type) else v for v in kw.values()]
        super(__class__, self).__init__(*args, item_names=list(kw.keys()), auto_capture=auto_capture,
                                        id=id, name=name, group_id=group_id)
        self.post_action = post_action() if isinstance(post_action, type) else post_action
        self._sync = False
        self._hooks = []
        register_hooks(self, resolve_builtin_hooks(self))

    @execution_with_hooks
    def __call__(self, *args, **kw):
        with globals.stack_enter(self.identities):
            output = self._run(args[0] if len(args) == 1 else package(args), **kw)
        if self.post_action is not None:
            self.invoke(self.post_action, output)
        if self._sync:
            self.wait()
        return self._post_process(output)

    def register_hook(self, hook_type: LazyLLMHook):
        """Register a hook type for additional processing before and after the flow execution.

Args:
    hook_type (LazyLLMHook): The hook type or instance to register.
"""
        if not (isinstance(hook_type, LazyLLMHook)
                or (isinstance(hook_type, type) and issubclass(hook_type, LazyLLMHook))):
            raise TypeError(f'Invalid hook type: {type(hook_type)}, '
                            'must be subclass or instance of LazyLLMHook')
        if hook_type not in self._hooks:
            self._hooks.append(hook_type)

    def unregister_hook(self, hook_type: LazyLLMHook):
        """Unregister a previously registered hook.

Args:
    hook_type (LazyLLMHook): The hook type or instance to remove.
"""
        if hook_type in self._hooks:
            self._hooks.remove(hook_type)

    def clear_hooks(self):
        """Clear all registered hooks.
"""
        self._hooks = []

    def _post_process(self, output):
        return output

    def _run(self, __input, **kw):
        raise NotImplementedError

    def start(self, *args, **kw):
        """Start flow processing execution (deprecated).

This method is deprecated, it is recommended to directly call the flow instance as a function. Executes the flow processing and returns the result.

Args:
    *args: Variable positional arguments passed to the flow processing.
    **kw: Named arguments passed to the flow processing.

**Returns:**

- The result of flow processing.

**Note:**

- This method is marked as deprecated, please use direct invocation of the flow instance instead.
"""
        lazyllm.LOG.warning('start is depreciated, please use flow as a function instead')
        return self(*args, **kw)

    def set_sync(self, sync=True):
        """Set whether the flow executes synchronously.

Args:
    sync (bool): Whether to execute synchronously. Default is True.

**Returns:**

- LazyLLMFlowsBase: The current instance.
"""
        self._sync = sync
        return self

    def __repr__(self):
        subs = [repr(it) for it in self._items]
        if self.post_action is not None:
            subs.append(lazyllm.make_repr('Flow', 'PostAction', subs=[self.post_action.__repr__()]))
        return lazyllm.make_repr('Flow', self.__class__.__name__, subs=subs, items=self._item_names)

    def wait(self):
        """Wait for all asynchronous tasks in the flow to complete.

**Returns:**

- LazyLLMFlowsBase: The current instance.
"""
        def filter(x):
            return hasattr(x, 'job') and isinstance(x.job, ReadOnlyWrapper) and not x.job.isNone()
        self.for_each(filter, lambda x: x.job.wait())
        return self

    # bind_args: dict(input=input, args=dict(key=value))
    def invoke(self, it, __input, *, bind_args_source=None, **kw):
        """Invoke a target (function, module, or bind object) with the given input.  
Supports root/pipeline output replacement for bind objects.

Args:
    it (Callable | bind): The target to invoke.
    __input (Any): Input data.
    bind_args_source (Any, optional): Source of bind arguments.
    **kw: Additional keyword arguments.
"""
        if isinstance(it, bind):
            if isinstance(self, Pipeline):
                it._args = [self.output(a) if a in self._items else a for a in it._args]
                it._kw = {k: self.output(v) if v in self._items else v for k, v in it._kw.items()}
            kw['_bind_args_source'] = bind_args_source
        try:
            if not isinstance(it, LazyLLMFlowsBase) and isinstance(__input, (package, kwargs)):
                return it(*__input, **kw) if isinstance(__input, package) else it(**__input, **kw)
            else:
                return it(__input, **kw)
        except HandledException as e: raise e
        except Exception as e:
            try:
                pos = self._item_pos[self._items.index(it)]
            except Exception:
                pos = None
            if '_bind_args_source' in kw: kw = (kw.get('_bind_args_source') or {}).pop('kwargs', None)
            err_msg = (f'Flow defined at {self._defined_pos or "Unknown position"} encountered an error:\n'
                       f'invoking `{it}`({pos or "Position not found"}) with input `{__input}` and kw `{kw}` failed. '
                       + 'Details: `{type}: {value}`'.format(type=type(e).__name__, value=str(e).replace('\n', '\\n')))
            LOG.error(err_msg)
            LOG.debug(f'Error type: {type(e).__name__}, Error message: {str(e)}\n'
                      f'Traceback: {"".join(traceback.format_exception(*sys.exc_info()))}')
            raise _change_exception_type(e, FlowException) from None

    def bind(self, *args, **kw):
        """Bind arguments to the current flow, producing a bind object.

Args:
    *args: Positional arguments.
    **kw: Keyword arguments.

**Returns:**

- bind: The bound bind object.
"""
        return bind(self, *args, **kw)

bind(*args, **kw)

Bind arguments to the current flow, producing a bind object.

Parameters:

  • *args

    Positional arguments.

  • **kw

    Keyword arguments.

Returns:

  • bind: The bound bind object.
Source code in lazyllm/flow/flow.py
    def bind(self, *args, **kw):
        """Bind arguments to the current flow, producing a bind object.

Args:
    *args: Positional arguments.
    **kw: Keyword arguments.

**Returns:**

- bind: The bound bind object.
"""
        return bind(self, *args, **kw)

clear_hooks()

Clear all registered hooks.

Source code in lazyllm/flow/flow.py
    def clear_hooks(self):
        """Clear all registered hooks.
"""
        self._hooks = []

invoke(it, __input, *, bind_args_source=None, **kw)

Invoke a target (function, module, or bind object) with the given input.
Supports root/pipeline output replacement for bind objects.

Parameters:

  • it (Callable | bind) –

    The target to invoke.

  • __input (Any) –

    Input data.

  • bind_args_source (Any, default: None ) –

    Source of bind arguments.

  • **kw

    Additional keyword arguments.

Source code in lazyllm/flow/flow.py
    def invoke(self, it, __input, *, bind_args_source=None, **kw):
        """Invoke a target (function, module, or bind object) with the given input.  
Supports root/pipeline output replacement for bind objects.

Args:
    it (Callable | bind): The target to invoke.
    __input (Any): Input data.
    bind_args_source (Any, optional): Source of bind arguments.
    **kw: Additional keyword arguments.
"""
        if isinstance(it, bind):
            if isinstance(self, Pipeline):
                it._args = [self.output(a) if a in self._items else a for a in it._args]
                it._kw = {k: self.output(v) if v in self._items else v for k, v in it._kw.items()}
            kw['_bind_args_source'] = bind_args_source
        try:
            if not isinstance(it, LazyLLMFlowsBase) and isinstance(__input, (package, kwargs)):
                return it(*__input, **kw) if isinstance(__input, package) else it(**__input, **kw)
            else:
                return it(__input, **kw)
        except HandledException as e: raise e
        except Exception as e:
            try:
                pos = self._item_pos[self._items.index(it)]
            except Exception:
                pos = None
            if '_bind_args_source' in kw: kw = (kw.get('_bind_args_source') or {}).pop('kwargs', None)
            err_msg = (f'Flow defined at {self._defined_pos or "Unknown position"} encountered an error:\n'
                       f'invoking `{it}`({pos or "Position not found"}) with input `{__input}` and kw `{kw}` failed. '
                       + 'Details: `{type}: {value}`'.format(type=type(e).__name__, value=str(e).replace('\n', '\\n')))
            LOG.error(err_msg)
            LOG.debug(f'Error type: {type(e).__name__}, Error message: {str(e)}\n'
                      f'Traceback: {"".join(traceback.format_exception(*sys.exc_info()))}')
            raise _change_exception_type(e, FlowException) from None

register_hook(hook_type)

Register a hook type for additional processing before and after the flow execution.

Parameters:

  • hook_type (LazyLLMHook) –

    The hook type or instance to register.

Source code in lazyllm/flow/flow.py
    def register_hook(self, hook_type: LazyLLMHook):
        """Register a hook type for additional processing before and after the flow execution.

Args:
    hook_type (LazyLLMHook): The hook type or instance to register.
"""
        if not (isinstance(hook_type, LazyLLMHook)
                or (isinstance(hook_type, type) and issubclass(hook_type, LazyLLMHook))):
            raise TypeError(f'Invalid hook type: {type(hook_type)}, '
                            'must be subclass or instance of LazyLLMHook')
        if hook_type not in self._hooks:
            self._hooks.append(hook_type)

set_sync(sync=True)

Set whether the flow executes synchronously.

Parameters:

  • sync (bool, default: True ) –

    Whether to execute synchronously. Default is True.

Returns:

  • LazyLLMFlowsBase: The current instance.
Source code in lazyllm/flow/flow.py
    def set_sync(self, sync=True):
        """Set whether the flow executes synchronously.

Args:
    sync (bool): Whether to execute synchronously. Default is True.

**Returns:**

- LazyLLMFlowsBase: The current instance.
"""
        self._sync = sync
        return self

start(*args, **kw)

Start flow processing execution (deprecated).

This method is deprecated, it is recommended to directly call the flow instance as a function. Executes the flow processing and returns the result.

Parameters:

  • *args

    Variable positional arguments passed to the flow processing.

  • **kw

    Named arguments passed to the flow processing.

Returns:

  • The result of flow processing.

Note:

  • This method is marked as deprecated, please use direct invocation of the flow instance instead.
Source code in lazyllm/flow/flow.py
    def start(self, *args, **kw):
        """Start flow processing execution (deprecated).

This method is deprecated, it is recommended to directly call the flow instance as a function. Executes the flow processing and returns the result.

Args:
    *args: Variable positional arguments passed to the flow processing.
    **kw: Named arguments passed to the flow processing.

**Returns:**

- The result of flow processing.

**Note:**

- This method is marked as deprecated, please use direct invocation of the flow instance instead.
"""
        lazyllm.LOG.warning('start is depreciated, please use flow as a function instead')
        return self(*args, **kw)

unregister_hook(hook_type)

Unregister a previously registered hook.

Parameters:

  • hook_type (LazyLLMHook) –

    The hook type or instance to remove.

Source code in lazyllm/flow/flow.py
    def unregister_hook(self, hook_type: LazyLLMHook):
        """Unregister a previously registered hook.

Args:
    hook_type (LazyLLMHook): The hook type or instance to remove.
"""
        if hook_type in self._hooks:
            self._hooks.remove(hook_type)

wait()

Wait for all asynchronous tasks in the flow to complete.

Returns:

  • LazyLLMFlowsBase: The current instance.
Source code in lazyllm/flow/flow.py
    def wait(self):
        """Wait for all asynchronous tasks in the flow to complete.

**Returns:**

- LazyLLMFlowsBase: The current instance.
"""
        def filter(x):
            return hasattr(x, 'job') and isinstance(x.job, ReadOnlyWrapper) and not x.job.isNone()
        self.for_each(filter, lambda x: x.job.wait())
        return self

lazyllm.flow.Pipeline

Bases: LazyLLMFlowsBase

A sequential execution model that forms a pipeline of processing stages.

The Pipeline class is a linear sequence of processing stages, where the output of one stage becomes the input to the next. It supports the addition of post-actions that can be performed after the last stage. It is a subclass of LazyLLMFlowsBase which provides a lazy execution model and allows for functions to be wrapped and registered in a lazy manner.

Parameters:

  • args (list of callables or single callable, default: () ) –

    The processing stages of the pipeline. Each element can be a callable function or an instance of LazyLLMFlowsBase.FuncWrap. If a single list or tuple is provided, it is unpacked as the stages of the pipeline.

  • post_action (callable, default: None ) –

    An optional action to perform after the last stage of the pipeline. Defaults to None.

  • auto_capture (bool, default: False ) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • kwargs (dict of callables) –

    Named processing stages of the pipeline. Each key-value pair represents a named stage, where the key is the name and the value is the callable stage.

Returns:

  • The output of the last stage of the pipeline.

Examples:

>>> import lazyllm
>>> ppl = lazyllm.pipeline(
...     stage1=lambda x: x+1,
...     stage2=lambda x: f'get {x}'
... )
>>> ppl(1)
'get 2'
>>> ppl.stage2
<Function type=lambda>
Source code in lazyllm/flow/flow.py
class Pipeline(LazyLLMFlowsBase):
    """A sequential execution model that forms a pipeline of processing stages.

The ``Pipeline`` class is a linear sequence of processing stages, where the output of one stage becomes the input to the next. It supports the addition of post-actions that can be performed after the last stage. It is a subclass of ``LazyLLMFlowsBase`` which provides a lazy execution model and allows for functions to be wrapped and registered in a lazy manner.

Args:
    args (list of callables or single callable): The processing stages of the pipeline. Each element can be a callable function or an instance of ``LazyLLMFlowsBase.FuncWrap``. If a single list or tuple is provided, it is unpacked as the stages of the pipeline.
    post_action (callable, optional): An optional action to perform after the last stage of the pipeline. Defaults to None.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    kwargs (dict of callables): Named processing stages of the pipeline. Each key-value pair represents a named stage, where the key is the name and the value is the callable stage.

**Returns:**

- The output of the last stage of the pipeline.


Examples:
    >>> import lazyllm
    >>> ppl = lazyllm.pipeline(
    ...     stage1=lambda x: x+1,
    ...     stage2=lambda x: f'get {x}'
    ... )
    >>> ppl(1)
    'get 2'
    >>> ppl.stage2
    <Function type=lambda>
    """
    def __init__(self, *args, post_action=None, auto_capture=False, save_result=None, **kw):
        super().__init__(*args, post_action=post_action, auto_capture=auto_capture, **kw)
        self._save_flow_result = save_result if save_result is not None else (
            _get_current_save_flag() or config['save_flow_result'])

    @property
    def save_flow_result(self): return self._save_flow_result

    @save_flow_result.setter
    def save_flow_result(self, value): self._save_flow_result = value

    @property
    def _loop_count(self):
        return getattr(self, '_loop_count_var', 1)

    @_loop_count.setter
    def _loop_count(self, count):
        assert count > 1, 'At least one loop is required!'
        self._loop_count_var = count

    @property
    def _stop_condition(self):
        return getattr(self, '_stop_condition_var', None)

    @_stop_condition.setter
    def _stop_condition(self, cond):
        self._stop_condition_var = cond

    @property
    def _judge_on_full_input(self):
        return getattr(self, '_judge_on_full_input_var', True)

    @_judge_on_full_input.setter
    def _judge_on_full_input(self, judge):
        self._judge_on_full_input_var = judge

    @property
    def input(self): return bind.Args(self.id())
    @property
    def kwargs(self): return bind.Args(self.id(), 'kwargs')
    def output(self, module, unpack=False):
        """Get the output result of a specified module in the pipeline.

Args:
    module: The module to get output from. Can be a module object or module name.
    unpack (bool): Whether to unpack the output result. Defaults to False.

**Returns:**

- bind.Args: A bound argument object for data passing in the pipeline.
"""
        return bind.Args(self.id(), self.id(module), unpack=unpack)

    def _run(self, __input, **kw):
        output = __input
        bind_args_source = dict(source=self.id(), input=output, kwargs=kw.copy())
        bind_flag = self.save_flow_result if (flag := _get_current_save_flag()) is None else flag
        if bind_flag:
            lazyllm.LOG.debug(f'add {self.id()} to bind_args')
            locals['bind_args'][self.id()] = bind_args_source
        _iteration_idx = -1
        for _iteration_idx in range(self._loop_count):
            for it in self._items:
                output = self.invoke(it, output, bind_args_source=bind_args_source, **kw)
                kw.clear()
                bind_args_source[self.id(it)] = output
            exp = output
            if not self._judge_on_full_input:
                assert isinstance(output, tuple) and len(output) >= 2
                exp = output[0]
                output = output[1:]
            if callable(self._stop_condition) and self.invoke(self._stop_condition, exp): break
        if isinstance(self, Loop) and isinstance(tr := globals.get('trace'), dict):
            tr.setdefault('actual_iterations', {})[self.id()] = _iteration_idx + 1
        if bind_flag:
            lazyllm.LOG.debug(f'delete {self.id()} form bind_args')
            locals['bind_args'].pop(self.id(), None)
        return output

output(module, unpack=False)

Get the output result of a specified module in the pipeline.

Parameters:

  • module

    The module to get output from. Can be a module object or module name.

  • unpack (bool, default: False ) –

    Whether to unpack the output result. Defaults to False.

Returns:

  • bind.Args: A bound argument object for data passing in the pipeline.
Source code in lazyllm/flow/flow.py
    def output(self, module, unpack=False):
        """Get the output result of a specified module in the pipeline.

Args:
    module: The module to get output from. Can be a module object or module name.
    unpack (bool): Whether to unpack the output result. Defaults to False.

**Returns:**

- bind.Args: A bound argument object for data passing in the pipeline.
"""
        return bind.Args(self.id(), self.id(module), unpack=unpack)

lazyllm.flow.save_pipeline_result(flag=True)

A context manager that temporarily sets whether to save intermediate results during pipeline execution.

When entering the context, Pipeline.g_save_flow_result is set to the given value. After exiting, it restores the previous value. Useful for debugging or recording intermediate outputs.

Parameters:

  • flag (bool, default: True ) –

    Whether to enable result saving. Defaults to True.

Returns:

  • ContextManager: A context manager.

Examples:

>>> import lazyllm
>>> pipe = lazyllm.pipeline(lambda x: x + 1, lambda x: x * 2)
>>> with lazyllm.save_pipeline_result(True):
...     result = pipe(1)
>>> result
4
Source code in lazyllm/flow/flow.py
@contextmanager
def save_pipeline_result(flag: bool = True):
    """A context manager that temporarily sets whether to save intermediate results during pipeline execution.

When entering the context, `Pipeline.g_save_flow_result` is set to the given value. After exiting, it restores the previous value. Useful for debugging or recording intermediate outputs.

Args:
    flag (bool): Whether to enable result saving. Defaults to True.

**Returns:**

- ContextManager: A context manager.


Examples:
    >>> import lazyllm
    >>> pipe = lazyllm.pipeline(lambda x: x + 1, lambda x: x * 2)
    >>> with lazyllm.save_pipeline_result(True):
    ...     result = pipe(1)
    >>> result
    4
    """
    old_flag = _get_current_save_flag()
    try:
        _set_current_save_flag(flag)
        yield
    finally:
        _set_current_save_flag(old_flag)

lazyllm.flow.Parallel

Bases: LazyLLMFlowsBase

A class for managing parallel flows in LazyLLMFlows.

This class inherits from LazyLLMFlowsBase and provides an interface for running operations in parallel or sequentially. It supports concurrent execution using threads and allows for the return of results as a dictionary.

The Parallel class can be visualized as follows:

#       /> module11 -> ... -> module1N -> out1 \
# input -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#       \> module31 -> ... -> module3N -> out3 /

The Parallel.sequential method can be visualized as follows:

# input -> module21 -> ... -> module2N -> out2 -> 

Parameters:

  • args

    Variable length argument list for the base class.

  • _scatter (bool, default: False ) –

    If True, the input is split across the items. If False, the same input is passed to all items. Defaults to False.

  • _concurrent (Union[bool, int], default: True ) –

    If True, operations will be executed concurrently using threading. If an integer, specifies the maximum number of concurrent executions. If False, operations will be executed sequentially. Defaults to True.

  • multiprocessing (bool, default: False ) –

    If True, multiprocessing will be used instead of multithreading for parallel execution. This can provide true parallelism but adds overhead for inter-process communication. Defaults to False.

  • auto_capture (bool, default: False ) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • kwargs

    Arbitrary keyword arguments for the base class.

asdict property

Tag Parallel so that the return value of each call to Parallel is changed from a tuple to a dict. When using asdict, make sure that the elements of parallel are named, for example: parallel(name=value).

astuple property

Mark Parallel so that the return value of Parallel changes from package to tuple each time it is called.

aslist property

Mark Parallel so that the return value of Parallel changes from package to list each time it is called.

sum property

Mark Parallel so that the return value of Parallel is accumulated each time it is called.

join(self, string)

Mark Parallel so that the return value of Parallel is joined by string each time it is called.

Examples:

>>> import lazyllm
>>> test1 = lambda a: a + 1
>>> test2 = lambda a: a * 4
>>> test3 = lambda a: a / 2
>>> ppl = lazyllm.parallel(test1, test2, test3)
>>> ppl(1)
(2, 4, 0.5)
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3)
>>> ppl(1)
{2, 4, 0.5}
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).asdict
>>> ppl(2)
{'a': 3, 'b': 8, 'c': 1.0}
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).astuple
>>> ppl(-1)
(0, -4, -0.5)
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).aslist
>>> ppl(0)
[1, 0, 0.0]
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\n')
>>> ppl(1)
'2\n4\n0.5'
Source code in lazyllm/flow/flow.py
class Parallel(LazyLLMFlowsBase):
    """A class for managing parallel flows in LazyLLMFlows.

This class inherits from LazyLLMFlowsBase and provides an interface for running operations in parallel or sequentially. It supports concurrent execution using threads and allows for the return of results as a dictionary.


The ``Parallel`` class can be visualized as follows:

```text
#       /> module11 -> ... -> module1N -> out1 \\
# input -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#       \> module31 -> ... -> module3N -> out3 /
```       

The ``Parallel.sequential`` method can be visualized as follows:

```text
# input -> module21 -> ... -> module2N -> out2 -> 
```

Args:
    args: Variable length argument list for the base class.
    _scatter (bool, optional): If ``True``, the input is split across the items. If ``False``, the same input is passed to all items. Defaults to ``False``.
    _concurrent (Union[bool, int], optional): If ``True``, operations will be executed concurrently using threading. If an integer, specifies the maximum number of concurrent executions. If ``False``, operations will be executed sequentially. Defaults to ``True``.
    multiprocessing (bool, optional): If ``True``, multiprocessing will be used instead of multithreading for parallel execution. This can provide true parallelism but adds overhead for inter-process communication. Defaults to ``False``.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    kwargs: Arbitrary keyword arguments for the base class.

`asdict property`

Tag ``Parallel`` so that the return value of each call to ``Parallel`` is changed from a tuple to a dict. When using ``asdict``, make sure that the elements of ``parallel`` are named, for example: ``parallel(name=value)``.

`astuple property`

Mark Parallel so that the return value of Parallel changes from package to tuple each time it is called.

`aslist property`

Mark Parallel so that the return value of Parallel changes from package to list each time it is called.

`sum property`

Mark Parallel so that the return value of Parallel is accumulated each time it is called.

`join(self, string)`

Mark Parallel so that the return value of Parallel is joined by ``string`` each time it is called.


Examples:
    >>> import lazyllm
    >>> test1 = lambda a: a + 1
    >>> test2 = lambda a: a * 4
    >>> test3 = lambda a: a / 2
    >>> ppl = lazyllm.parallel(test1, test2, test3)
    >>> ppl(1)
    (2, 4, 0.5)
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3)
    >>> ppl(1)
    {2, 4, 0.5}
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).asdict
    >>> ppl(2)
    {'a': 3, 'b': 8, 'c': 1.0}
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).astuple
    >>> ppl(-1)
    (0, -4, -0.5)
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).aslist
    >>> ppl(0)
    [1, 0, 0.0]
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\\n')
    >>> ppl(1)
    '2\\n4\\n0.5'
    """

    @staticmethod
    def _worker(func, barrier, sid, local_data, *args, global_data=None, **kw):
        lazyllm.globals._init_sid(sid)
        if global_data: lazyllm.globals._update(global_data)
        lazyllm.locals._init_sid()
        lazyllm.locals._update({k: v.copy() for k, v in local_data.items()})
        _barr.impl = barrier
        r = func(*args, **kw)
        lazyllm.locals.clear()
        return r

    class PostProcessType(Enum):
        NONE = 0
        DICT = 1
        TUPLE = 2
        LIST = 3
        SUM = 4
        JOIN = 5

    def __init__(self, *args, _scatter: bool = False, _concurrent: Union[bool, int] = True,
                 multiprocessing: bool = False, auto_capture: bool = False, **kw):
        super().__init__(*args, **kw, auto_capture=auto_capture)
        self._post_process_type = Parallel.PostProcessType.NONE
        self._post_process_args = None
        self._multiprocessing = multiprocessing or config['parallel_multiprocessing']
        self._concurrent = 0 if not _concurrent else 5 if isinstance(_concurrent, bool) else _concurrent
        self._scatter = _scatter

    @staticmethod
    def _set_status(self, type, args=None):
        assert self._post_process_type is Parallel.PostProcessType.NONE, 'Cannor set post process twice'
        self._post_process_type = type
        self._post_process_args = args
        return self

    asdict = property(partial(_set_status, type=PostProcessType.DICT))
    astuple = property(partial(_set_status, type=PostProcessType.TUPLE))
    aslist = property(partial(_set_status, type=PostProcessType.LIST))
    sum = property(partial(_set_status, type=PostProcessType.SUM))

    def join(self, string=''):
        """Marks the Parallel instance to join its results with the specified string on each call.

Args:
    string (str): The string to use for joining results. Defaults to an empty string.

**Returns:**

- Parallel: Returns the current Parallel instance configured to join results with the specified string.

**Example:**

```python
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\n')
>>> ppl(1)
'2\n4\n0.5'
```
"""
        assert isinstance(string, str), 'argument of join shoule be str'
        return Parallel._set_status(self, type=Parallel.PostProcessType.JOIN, args=string)

    @classmethod
    def sequential(cls, *args, **kw):
        """Creates a Parallel instance that executes sequentially.

This class method sets ``_concurrent`` to ``False``, causing all operations to be executed in sequence rather than in parallel.

The ``Parallel.sequential`` method can be visualized as follows:

```text
# input -> module21 -> ... -> module2N -> out2 -> 
```

Args:
    args: Variable length argument list passed to the Parallel constructor.
    kwargs: Keyword arguments passed to the Parallel constructor.
    _scatter (bool, optional): If ``True``, the input is split across the items. If ``False``, the same input is passed to all items. Defaults to ``False``.
    _concurrent (bool, optional): If ``True``, operations will be executed concurrently using threading. If ``False``, operations will be executed sequentially. Defaults to ``True``.
    multiprocessing (bool, optional): If ``True``, multiprocessing will be used instead of multithreading for parallel execution. This can provide true parallelism but adds overhead for inter-process communication. Defaults to ``False``.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    args: Variable length argument list for the base class.
    kwargs: Arbitrary keyword arguments for the base class.

**Returns:**

- Parallel: A new Parallel instance configured for sequential execution.
"""
        return cls(*args, _concurrent=False, **kw)

    # items = [a, b, c, d, e], skip_items = [1, 3]/['b', 'd'] -> items = [a, c, e]
    # input = [0, 1, 2, 3, 4] -> [0, 2, 4]
    # input = [0, 2, 4] -> [0, 2, 4]
    # input = dict(a=0, b=1, c=2, d=3, e=4) -> [0, 2, 4]
    # input = dict(a=0, c=2, e=4) -> [0, 2, 4]
    def _split_input(self, inputs, items, _kept_idx):
        if self._scatter:
            if isinstance(inputs, dict):
                inputs = package(inputs[n] for n in ([n for i, n in enumerate(self._item_names or repeat(None))
                                                     if i in _kept_idx] if _kept_idx else self._item_names))
            else:
                if isinstance(inputs, package) and len(inputs) == 1: inputs = inputs[0]
                assert isinstance(inputs, (tuple, list)), (
                    f'Only tuple and list input can be split automatically, your input is {inputs} <{type(inputs)}>')
                if _kept_idx and len(inputs) != len(_kept_idx):
                    assert len(inputs) == len(self._items)
                    inputs = [inp for i, inp in enumerate(inputs) if i in _kept_idx]
        else:
            inputs = [inputs] * len(items)
        return inputs

    def _parallel_resolve_items_and_inputs(self, __input, __items, _kept_items, _skip_items):
        if (items := __items) is None:
            items = self._items
        if _kept_items:
            if _skip_items:
                raise RuntimeError('Cannot provide `_kept_items` and `_skip_items` at the same time!')
            _kept_idx = [i for i, (_, n) in enumerate(zip(self._items, self._item_names or repeat(None)))
                         if i in _kept_items or n in _kept_items]
        else:
            _kept_idx = ([i for i, (_, n) in enumerate(zip(self._items, self._item_names or repeat(None)))
                          if i not in _skip_items and n not in _skip_items] if _skip_items else None)
        if _kept_idx:
            items = [it for i, it in enumerate(self._items) if i in _kept_idx]
        inputs = self._split_input(__input, items, _kept_idx)
        return items, inputs

    def _parallel_execute_concurrent(self, items, inputs, **kw):
        if self._multiprocessing:
            barrier, executor = None, lazyllm.ProcessPoolExecutor
            kw['global_data'] = lazyllm.globals._data
        else:
            barrier, executor = threading.Barrier(len(items)), concurrent.futures.ThreadPoolExecutor

        with executor(max_workers=self._concurrent) as e:
            # Thread workers need the parent's contextvars (OTel current span, tracing ContextVars,
            # etc.); ProcessPoolExecutor cannot use copy_context the same way (separate interpreter).
            futures = []
            for it, inp in zip(items, inputs):
                worker_call = partial(self._worker, self.invoke, barrier, lazyllm.globals._sid,
                                      lazyllm.locals._data, it, inp, **kw)
                if self._multiprocessing:
                    futures.append(e.submit(worker_call))
                else:
                    futures.append(e.submit(copy_context().run, worker_call))
            if (not_done := concurrent.futures.wait(futures).not_done):
                error_msgs = []
                for future in not_done:
                    if (exc := future.exception()) is not None:
                        if (tb := getattr(future, '_traceback', None)):
                            tb_str = ''.join(traceback.format_exception(type(exc), exc, tb))
                        else:
                            tb_str = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))
                        error_msgs.append(f'Future: {future}\n{tb_str}')
                    else:
                        error_msgs.append(f'Future: {future} not complete without exception。')
                raise RuntimeError('Parallel execute failed!\n' + '\n'.join(error_msgs))
            return package([future.result() for future in futures])

    def _run(self, __input, __items=None, *, _kept_items: Optional[Union[int, str, List[Union[int, str]]]] = None,
             _skip_items: Optional[Union[int, str, List[Union[int, str]]]] = None, **kw):
        items, inputs = self._parallel_resolve_items_and_inputs(__input, __items, _kept_items, _skip_items)
        if self._concurrent:
            return self._parallel_execute_concurrent(items, inputs, **kw)
        return package(self.invoke(it, inp, **kw) for it, inp in zip(items, inputs))

    def _post_process(self, output):
        if self._post_process_type == Parallel.PostProcessType.DICT:
            assert self._item_names, 'Item name should be set when you want to return dict.'
            output = {k: v for k, v in zip(self._item_names, output)}
        elif self._post_process_type == Parallel.PostProcessType.TUPLE:
            output = tuple(output)
        elif self._post_process_type == Parallel.PostProcessType.LIST:
            output = list(output)
        elif self._post_process_type == Parallel.PostProcessType.SUM:
            output = ''.join([str(i) for i in output]) if isinstance(output[0], str) else sum(output, type(output[0])())
        elif self._post_process_type == Parallel.PostProcessType.JOIN:
            output = self._post_process_args.join([str(i) for i in output])
        return output

join(string='')

Marks the Parallel instance to join its results with the specified string on each call.

Parameters:

  • string (str, default: '' ) –

    The string to use for joining results. Defaults to an empty string.

Returns:

  • Parallel: Returns the current Parallel instance configured to join results with the specified string.

Example:

>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('
')
>>> ppl(1)
'2
4
0.5'
Source code in lazyllm/flow/flow.py
    def join(self, string=''):
        """Marks the Parallel instance to join its results with the specified string on each call.

Args:
    string (str): The string to use for joining results. Defaults to an empty string.

**Returns:**

- Parallel: Returns the current Parallel instance configured to join results with the specified string.

**Example:**

```python
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\n')
>>> ppl(1)
'2\n4\n0.5'
```
"""
        assert isinstance(string, str), 'argument of join shoule be str'
        return Parallel._set_status(self, type=Parallel.PostProcessType.JOIN, args=string)

sequential(*args, **kw) classmethod

Creates a Parallel instance that executes sequentially.

This class method sets _concurrent to False, causing all operations to be executed in sequence rather than in parallel.

The Parallel.sequential method can be visualized as follows:

# input -> module21 -> ... -> module2N -> out2 -> 

Parameters:

  • args

    Variable length argument list passed to the Parallel constructor.

  • kwargs

    Keyword arguments passed to the Parallel constructor.

  • _scatter (bool) –

    If True, the input is split across the items. If False, the same input is passed to all items. Defaults to False.

  • _concurrent (bool) –

    If True, operations will be executed concurrently using threading. If False, operations will be executed sequentially. Defaults to True.

  • multiprocessing (bool) –

    If True, multiprocessing will be used instead of multithreading for parallel execution. This can provide true parallelism but adds overhead for inter-process communication. Defaults to False.

  • auto_capture (bool) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • args

    Variable length argument list for the base class.

  • kwargs

    Arbitrary keyword arguments for the base class.

Returns:

  • Parallel: A new Parallel instance configured for sequential execution.
Source code in lazyllm/flow/flow.py
    @classmethod
    def sequential(cls, *args, **kw):
        """Creates a Parallel instance that executes sequentially.

This class method sets ``_concurrent`` to ``False``, causing all operations to be executed in sequence rather than in parallel.

The ``Parallel.sequential`` method can be visualized as follows:

```text
# input -> module21 -> ... -> module2N -> out2 -> 
```

Args:
    args: Variable length argument list passed to the Parallel constructor.
    kwargs: Keyword arguments passed to the Parallel constructor.
    _scatter (bool, optional): If ``True``, the input is split across the items. If ``False``, the same input is passed to all items. Defaults to ``False``.
    _concurrent (bool, optional): If ``True``, operations will be executed concurrently using threading. If ``False``, operations will be executed sequentially. Defaults to ``True``.
    multiprocessing (bool, optional): If ``True``, multiprocessing will be used instead of multithreading for parallel execution. This can provide true parallelism but adds overhead for inter-process communication. Defaults to ``False``.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    args: Variable length argument list for the base class.
    kwargs: Arbitrary keyword arguments for the base class.

**Returns:**

- Parallel: A new Parallel instance configured for sequential execution.
"""
        return cls(*args, _concurrent=False, **kw)

lazyllm.flow.Diverter

Bases: Parallel

A flow diverter that routes inputs through different modules in parallel.

The Diverter class is a specialized form of parallel processing where multiple inputs are each processed by a separate sequence of modules in parallel. The outputs are then aggregated and returned as a tuple.

This class is useful when you have distinct data processing pipelines that can be executed concurrently, and you want to manage them within a single flow construct.

#                 /> in1 -> module11 -> ... -> module1N -> out1 \
# (in1, in2, in3) -> in2 -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#                 \> in3 -> module31 -> ... -> module3N -> out3 /

Parameters:

  • args

    Variable length argument list representing the modules to be executed in parallel.

  • _concurrent (bool, default: True ) –

    A flag to control whether the modules should be run concurrently. Defaults to True. You can use Diverter.sequential instead of Diverter to set this variable.

  • auto_capture (bool, default: False ) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • kwargs

    Arbitrary keyword arguments representing additional modules, where the key is the name of the module.

Examples:

>>> import lazyllm
>>> div = lazyllm.diverter(lambda x: x+1, lambda x: x*2, lambda x: -x)
>>> div(1, 2, 3)
(2, 4, -3)
>>> div = lazyllm.diverter(a=lambda x: x+1, b=lambda x: x*2, c=lambda x: -x).asdict
>>> div(1, 2, 3)
{'a': 2, 'b': 4, 'c': -3}
>>> div(dict(c=3, b=2, a=1))
{'a': 2, 'b': 4, 'c': -3}
Source code in lazyllm/flow/flow.py
class Diverter(Parallel):
    """A flow diverter that routes inputs through different modules in parallel.

The Diverter class is a specialized form of parallel processing where multiple inputs are each processed by a separate sequence of modules in parallel. The outputs are then aggregated and returned as a tuple.

This class is useful when you have distinct data processing pipelines that can be executed concurrently, and you want to manage them within a single flow construct.

```text
#                 /> in1 -> module11 -> ... -> module1N -> out1 \\
# (in1, in2, in3) -> in2 -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#                 \> in3 -> module31 -> ... -> module3N -> out3 /
```                    

Args:
    args : Variable length argument list representing the modules to be executed in parallel.
    _concurrent (bool, optional): A flag to control whether the modules should be run concurrently. Defaults to ``True``. You can use ``Diverter.sequential`` instead of ``Diverter`` to set this variable.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    kwargs : Arbitrary keyword arguments representing additional modules, where the key is the name of the module.


Examples:
    >>> import lazyllm
    >>> div = lazyllm.diverter(lambda x: x+1, lambda x: x*2, lambda x: -x)
    >>> div(1, 2, 3)
    (2, 4, -3)
    >>> div = lazyllm.diverter(a=lambda x: x+1, b=lambda x: x*2, c=lambda x: -x).asdict
    >>> div(1, 2, 3)
    {'a': 2, 'b': 4, 'c': -3}
    >>> div(dict(c=3, b=2, a=1))
    {'a': 2, 'b': 4, 'c': -3}
    """
    def __init__(self, *args, _concurrent: Union[bool, int] = True, auto_capture: bool = False, **kw):
        super().__init__(*args, _scatter=True, _concurrent=_concurrent, auto_capture=auto_capture, **kw)

lazyllm.flow.Warp

Bases: Parallel

A flow warp that applies a single module to multiple inputs in parallel.

The Warp class is designed to apply the same processing module to a set of inputs. It effectively 'warps' the single module around the inputs so that each input is processed in parallel. The outputs are collected and returned as a tuple. It is important to note that this class cannot be used for asynchronous tasks, such as training and deployment.

#                 /> in1 \                            /> out1 \
# (in1, in2, in3) -> in2 -> module1 -> ... -> moduleN -> out2 -> (out1, out2, out3)
#                 \> in3 /                            \> out3 /

Parameters:

  • args

    Variable length argument list representing the single module to be applied to all inputs.

  • _scatter (bool) –

    Whether to scatter inputs into parts before processing. Defaults to False.

  • _concurrent (bool | int, default: True ) –

    Whether to execute in parallel. Can be a boolean or a max concurrency limit. Defaults to True.

  • auto_capture (bool, default: False ) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • kwargs

    Arbitrary keyword arguments for future extensions.

Note
  • Only one function is allowed in warp.
  • The Warp flow should not be used for asynchronous tasks such as training and deployment.

Examples:

>>> import lazyllm
>>> warp = lazyllm.warp(lambda x: x * 2)
>>> warp(1, 2, 3, 4)
(2, 4, 6, 8)
>>> warp = lazyllm.warp(lazyllm.pipeline(lambda x: x * 2, lambda x: f'get {x}'))
>>> warp(1, 2, 3, 4)
('get 2', 'get 4', 'get 6', 'get 8')
>>> from lazyllm import package
>>> warp1 = lazyllm.warp(lambda x, y: x * 2 + y)
>>> print(warp1([package(1,2), package(10, 20)]))
(4, 40)
Source code in lazyllm/flow/flow.py
class Warp(Parallel):
    """A flow warp that applies a single module to multiple inputs in parallel.

The Warp class is designed to apply the same processing module to a set of inputs. It effectively 'warps' the single module around the inputs so that each input is processed in parallel. The outputs are collected and returned as a tuple. It is important to note that this class cannot be used for asynchronous tasks, such as training and deployment.

```text
#                 /> in1 \                            /> out1 \\
# (in1, in2, in3) -> in2 -> module1 -> ... -> moduleN -> out2 -> (out1, out2, out3)
#                 \> in3 /                            \> out3 /
``` 

Args:
    args: Variable length argument list representing the single module to be applied to all inputs.
    _scatter (bool): Whether to scatter inputs into parts before processing. Defaults to False.
    _concurrent (bool | int): Whether to execute in parallel. Can be a boolean or a max concurrency limit. Defaults to True.
    auto_capture (bool): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    kwargs: Arbitrary keyword arguments for future extensions.

Note:
    - Only one function is allowed in warp.
    - The Warp flow should not be used for asynchronous tasks such as training and deployment.


Examples:
    >>> import lazyllm
    >>> warp = lazyllm.warp(lambda x: x * 2)
    >>> warp(1, 2, 3, 4)
    (2, 4, 6, 8)
    >>> warp = lazyllm.warp(lazyllm.pipeline(lambda x: x * 2, lambda x: f'get {x}'))
    >>> warp(1, 2, 3, 4)
    ('get 2', 'get 4', 'get 6', 'get 8')

    >>> from lazyllm import package
    >>> warp1 = lazyllm.warp(lambda x, y: x * 2 + y)
    >>> print(warp1([package(1,2), package(10, 20)]))
    (4, 40)
    """
    def __init__(self, *args, _concurrent: Union[bool, int] = True, auto_capture: bool = False, **kw):
        super().__init__(*args, _scatter=True, _concurrent=_concurrent, auto_capture=auto_capture, **kw)
        if len(self._items) > 1: self._items = [Pipeline(*self._items)]

    def __post_init__(self):
        if len(self._items) > 1: self._items = [Pipeline(*self._items)]

    def _run(self, __input, **kw):
        assert 1 == len(self._items), 'Only one function is enabled in warp'
        assert '_skip_items' not in kw, '`skip_items` is not allowed in warp'
        assert '_kept_items' not in kw, '`_kept_items` is not allowed in warp'
        return super(__class__, self)._run(__input, self._items * len(package(__input)), **kw)

    @property
    def asdict(self): raise NotImplementedError

lazyllm.flow.IFS

Bases: LazyLLMFlowsBase

Implements an If-Else functionality within the LazyLLMFlows framework.

The IFS (If-Else Flow Structure) class is designed to conditionally execute one of two provided paths (true path or false path) based on the evaluation of a given condition. After the execution of the selected path, an optional post-action can be applied, and the input can be returned alongside the output if specified.

Parameters:

  • cond (callable) –

    A callable that takes the input and returns a boolean. It determines which path to execute. If cond(input) evaluates to True, tpath is executed; otherwise, fpath is executed.

  • tpath (callable) –

    The path to be executed if the condition is True.

  • fpath (callable) –

    The path to be executed if the condition is False.

  • post_action (callable, default: None ) –

    An optional callable that is executed after the selected path. It can be used to perform cleanup or further processing. Defaults to None.

Returns:

  • The output of the executed path.

Examples:

>>> import lazyllm
>>> cond = lambda x: x > 0
>>> tpath = lambda x: x * 2
>>> fpath = lambda x: -x
>>> ifs_flow = lazyllm.ifs(cond, tpath, fpath)
>>> ifs_flow(10)
20
>>> ifs_flow(-5)
5
Source code in lazyllm/flow/flow.py
class IFS(LazyLLMFlowsBase):
    """Implements an If-Else functionality within the LazyLLMFlows framework.

The IFS (If-Else Flow Structure) class is designed to conditionally execute one of two provided
paths (true path or false path) based on the evaluation of a given condition. After the execution
of the selected path, an optional post-action can be applied, and the input can be returned alongside
the output if specified.

Args:
    cond (callable): A callable that takes the input and returns a boolean. It determines which path
                        to execute. If ``cond(input)`` evaluates to True, ``tpath`` is executed; otherwise,
                        ``fpath`` is executed.
    tpath (callable): The path to be executed if the condition is True.
    fpath (callable): The path to be executed if the condition is False.
    post_action (callable, optional): An optional callable that is executed after the selected path.
                                        It can be used to perform cleanup or further processing. Defaults to None.

**Returns:**

- The output of the executed path.


Examples:
    >>> import lazyllm
    >>> cond = lambda x: x > 0
    >>> tpath = lambda x: x * 2
    >>> fpath = lambda x: -x
    >>> ifs_flow = lazyllm.ifs(cond, tpath, fpath)
    >>> ifs_flow(10)
    20
    >>> ifs_flow(-5)
    5
    """
    def __init__(self, cond, tpath, fpath, post_action=None):
        super().__init__(cond, tpath, fpath, post_action=post_action)

    def _run(self, __input, **kw):
        cond, tpath, fpath = self._items
        try:
            flag = cond()
        except Exception:
            flag = cond if isinstance(cond, bool) else self.invoke(cond, __input, **kw)
        chosen = tpath if flag else fpath
        branch_label = 'true_path' if flag else 'false_path'
        name = getattr(chosen, '__name__', None) or type(chosen).__name__
        matched = {'branch': branch_label, 'chosen_node': name, 'condition_result': bool(flag)}
        push_ifs_matched_attrs(matched)
        return self.invoke(chosen, __input, **kw)

lazyllm.flow.Switch

Bases: LazyLLMFlowsBase

A control flow mechanism that selects and executes a flow based on a condition.

The Switch class provides a way to choose between different flows depending on the value of an expression or the truthiness of conditions. It is similar to a switch-case statement found in other programming languages.

# switch(exp):
#     case cond1: input -> module11 -> ... -> module1N -> out; break
#     case cond2: input -> module21 -> ... -> module2N -> out; break
#     case cond3: input -> module31 -> ... -> module3N -> out; break

Parameters:

  • args

    A variable length argument list, alternating between conditions and corresponding flows or functions. Conditions are either callables returning a boolean or values to be compared with the input expression.

  • conversion (callable, default: None ) –

    A function used to transform or preprocess the evaluation expression exp before performing condition matching. Defaults to None.

  • post_action (callable, default: None ) –

    A function to be called on the output after the selected flow is executed. Defaults to None.

  • judge_on_full_input (bool, default: True ) –

    If set to True, the conditional judgment will be performed through the input of switch, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:

  • TypeError

    If an odd number of arguments are provided, or if the first argument is not a dictionary and the conditions are not provided in pairs.

Examples:

>>> import lazyllm
>>> def is_positive(x): return x > 0
...
>>> def is_negative(x): return x < 0
...
>>> switch = lazyllm.switch(is_positive, lambda x: 2 * x, is_negative, lambda x : -x, 'default', lambda x : '000', judge_on_full_input=True)
>>>
>>> switch(1)
2
>>> switch(0)
'000'
>>> switch(-4)
4
>>>
>>> def is_1(x): return True if x == 1 else False
...
>>> def is_2(x): return True if x == 2 else False
...
>>> def is_3(x): return True if x == 3 else False
...
>>> def t1(x): return 2 * x
...
>>> def t2(x): return 3 * x
...
>>> def t3(x): return x
...
>>> with lazyllm.switch(judge_on_full_input=True) as sw:
...     sw.case[is_1::t1]
...     sw.case(is_2, t2)
...     sw.case[is_3, t3]
...
>>> sw(1)
2
>>> sw(2)
6
>>> sw(3)
3
Source code in lazyllm/flow/flow.py
class Switch(LazyLLMFlowsBase):
    """A control flow mechanism that selects and executes a flow based on a condition.

The ``Switch`` class provides a way to choose between different flows depending on the value of an expression or the truthiness of conditions. It is similar to a switch-case statement found in other programming languages.

```text
# switch(exp):
#     case cond1: input -> module11 -> ... -> module1N -> out; break
#     case cond2: input -> module21 -> ... -> module2N -> out; break
#     case cond3: input -> module31 -> ... -> module3N -> out; break
``` 

Args:
    args: A variable length argument list, alternating between conditions and corresponding flows or functions. Conditions are either callables returning a boolean or values to be compared with the input expression.
    conversion (callable, optional): A function used to transform or preprocess the evaluation expression ``exp`` before performing condition matching. Defaults to ``None``.
    post_action (callable, optional): A function to be called on the output after the selected flow is executed. Defaults to ``None``.
    judge_on_full_input(bool): If set to ``True``, the conditional judgment will be performed through the input of ``switch``, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:
    TypeError: If an odd number of arguments are provided, or if the first argument is not a dictionary and the conditions are not provided in pairs.


Examples:
    >>> import lazyllm
    >>> def is_positive(x): return x > 0
    ...
    >>> def is_negative(x): return x < 0
    ...
    >>> switch = lazyllm.switch(is_positive, lambda x: 2 * x, is_negative, lambda x : -x, 'default', lambda x : '000', judge_on_full_input=True)
    >>>
    >>> switch(1)
    2
    >>> switch(0)
    '000'
    >>> switch(-4)
    4
    >>>
    >>> def is_1(x): return True if x == 1 else False
    ...
    >>> def is_2(x): return True if x == 2 else False
    ...
    >>> def is_3(x): return True if x == 3 else False
    ...
    >>> def t1(x): return 2 * x
    ...
    >>> def t2(x): return 3 * x
    ...
    >>> def t3(x): return x
    ...
    >>> with lazyllm.switch(judge_on_full_input=True) as sw:
    ...     sw.case[is_1::t1]
    ...     sw.case(is_2, t2)
    ...     sw.case[is_3, t3]
    ...
    >>> sw(1)
    2
    >>> sw(2)
    6
    >>> sw(3)
    3
    """
    # Switch({cond1: M1, cond2: M2, ..., condN: MN})
    # Switch(cond1, M1, cond2, M2, ..., condN, MN)
    def __init__(self, *args, conversion=None, post_action=None, judge_on_full_input=True):
        if len(args) == 1 and isinstance(args[0], dict):
            self.conds, items = list(args[0].keys()), list(args[0].values())
        else:
            self.conds, items = list(args[0::2]), args[1::2]
        super().__init__(*items, post_action=post_action)
        self._judge_on_full_input = judge_on_full_input
        self._set_conversion(conversion)

    def _set_conversion(self, conversion):
        self._conversion = conversion

    def _run(self, __input, **kw):
        exp = __input
        if not self._judge_on_full_input:
            assert isinstance(__input, tuple) and len(__input) >= 2
            exp = __input[0]
            __input = __input[1] if len(__input) == 2 else __input[1:]

        if self._conversion:
            exp = self._conversion(*exp) if isinstance(exp, package) else self._conversion(exp)

        for idx, cond in enumerate(self.conds):
            if (callable(cond) and self.invoke(cond, exp) is True) or (exp == cond) or (
                    exp == package((cond,))) or cond == 'default':
                alias = self._item_names[idx] if self._item_names and idx < len(self._item_names) else None
                actual = getattr(self._items[idx], '__name__', None) or type(self._items[idx]).__name__
                branch = f'{alias} -> {actual}' if alias and alias != actual else actual
                matched = {'index': idx, 'condition': str(cond), 'branch': branch}
                push_switch_matched_attrs(matched)
                return self.invoke(self._items[idx], __input, **kw)

    class Case:
        def __init__(self, m) -> None: self._m = m
        def __call__(self, cond, func): self._m._add_case(cond, func)

        def __getitem__(self, key):
            if isinstance(key, slice):
                if key.start:
                    if (callable(key.step) and key.stop is None):
                        return self._m._add_case(key.start, key.step)
                    elif (key.step is None and callable(key.stop)):
                        return self._m._add_case(key.start, key.stop)
            elif isinstance(key, tuple) and len(key) == 2 and callable(key[1]):
                return self._m._add_case(key[0], key[1])
            raise RuntimeError(f'Only [cond::func], [cond:func] or [cond, func] is allowed in case, but you give {key}')

    @property
    def case(self): return Switch.Case(self)

    def _add_case(self, case, func):
        self.conds.append(case)
        self._add(None, func)

lazyllm.flow.Loop

Bases: Pipeline

Initializes a Loop flow structure which repeatedly applies a sequence of functions to an input until a stop condition is met or a specified count of iterations is reached.

The Loop structure allows for the definition of a simple control flow where a series of steps are applied in a loop, with an optional stop condition that can be used to exit the loop early based on the output of the steps.

Parameters:

  • *item (callable or list of callables, default: () ) –

    The function(s) or callable object(s) that will be applied in the loop.

  • stop_condition (callable, default: None ) –

    A function that takes the output of the last item in the loop as input and returns a boolean. If it returns True, the loop will stop. If None, the loop will continue until count is reached. Defaults to None.

  • count (int, default: maxsize ) –

    The maximum number of iterations to run the loop for. Defaults to sys.maxsize.

  • post_action (callable, default: None ) –

    A function to be called with the final output after the loop ends. Defaults to None.

  • auto_capture (bool, default: False ) –

    If True, variables newly defined within the with block will be automatically added to the flow. Defaults to False.

  • judge_on_full_input (bool, default: True ) –

    If set to True, the conditional judgment will be performed through the input of stop_condition; otherwise, the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:

  • AssertionError

    If the provided stop_condition is neither callable nor None.

Examples:

>>> import lazyllm
>>> loop = lazyllm.loop(lambda x: x * 2, stop_condition=lambda x: x > 10, judge_on_full_input=True)
>>> loop(1)
16
>>> loop(3)
12
>>>
>>> with lazyllm.loop(stop_condition=lambda x: x > 10, judge_on_full_input=True) as lp:
...    lp.f1 = lambda x: x + 1
...    lp.f2 = lambda x: x * 2
...
>>> lp(0)
14
Source code in lazyllm/flow/flow.py
class Loop(Pipeline):
    """Initializes a Loop flow structure which repeatedly applies a sequence of functions to an input until a stop condition is met or a specified count of iterations is reached.

The Loop structure allows for the definition of a simple control flow where a series of steps are applied in a loop, with an optional stop condition that can be used to exit the loop early based on the output of the steps.

Args:
    *item (callable or list of callables): The function(s) or callable object(s) that will be applied in the loop.
    stop_condition (callable, optional): A function that takes the output of the last item in the loop as input and returns a boolean. If it returns ``True``, the loop will stop. If ``None``, the loop will continue until ``count`` is reached. Defaults to ``None``.
    count (int, optional): The maximum number of iterations to run the loop for. Defaults to ``sys.maxsize``.
    post_action (callable, optional): A function to be called with the final output after the loop ends. Defaults to ``None``.
    auto_capture (bool, optional): If True, variables newly defined within the ``with`` block will be automatically added to the flow. Defaults to ``False``.
    judge_on_full_input (bool): If set to ``True``, the conditional judgment will be performed through the input of ``stop_condition``; otherwise, the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:
    AssertionError: If the provided ``stop_condition`` is neither callable nor ``None``.


Examples:
    >>> import lazyllm
    >>> loop = lazyllm.loop(lambda x: x * 2, stop_condition=lambda x: x > 10, judge_on_full_input=True)
    >>> loop(1)
    16
    >>> loop(3)
    12
    >>>
    >>> with lazyllm.loop(stop_condition=lambda x: x > 10, judge_on_full_input=True) as lp:
    ...    lp.f1 = lambda x: x + 1
    ...    lp.f2 = lambda x: x * 2
    ...
    >>> lp(0)
    14
    """
    def __init__(self, *item, stop_condition=None, count=sys.maxsize, post_action=None,
                 auto_capture=False, judge_on_full_input=True, **kw):
        super().__init__(*item, post_action=post_action, auto_capture=auto_capture, **kw)
        assert callable(stop_condition) or stop_condition is None
        self._judge_on_full_input = judge_on_full_input
        self._stop_condition = stop_condition
        self._loop_count = count

lazyllm.flow.Graph

Bases: LazyLLMFlowsBase

A complex flow control structure based on Directed Acyclic Graph (DAG).

The Graph class allows you to create complex processing graphs where nodes represent processing functions and edges represent data flow. It supports topological sorting to ensure correct execution order and can handle complex dependencies with multiple inputs and outputs.

The Graph class is particularly suitable for scenarios requiring complex data flow and dependency management, such as machine learning pipelines, data processing workflows, etc.

Parameters:

  • post_action (callable, default: None ) –

    A function to be called after the graph execution is complete. Defaults to None.

  • auto_capture (bool, default: False ) –

    Whether to automatically capture variables from context. Defaults to False.

  • kwargs

    Arbitrary keyword arguments representing named nodes and corresponding functions.

Returns:

  • The final output result of the graph.
Source code in lazyllm/flow/flow.py
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
class Graph(LazyLLMFlowsBase):
    """A complex flow control structure based on Directed Acyclic Graph (DAG).

The Graph class allows you to create complex processing graphs where nodes represent processing functions and edges represent data flow. It supports topological sorting to ensure correct execution order and can handle complex dependencies with multiple inputs and outputs.

The Graph class is particularly suitable for scenarios requiring complex data flow and dependency management, such as machine learning pipelines, data processing workflows, etc.

Args:
    post_action (callable, optional): A function to be called after the graph execution is complete. Defaults to ``None``.
    auto_capture (bool, optional): Whether to automatically capture variables from context. Defaults to ``False``.
    kwargs: Arbitrary keyword arguments representing named nodes and corresponding functions.

**Returns:**

- The final output result of the graph.
"""

    start_node_name, end_node_name = '__start__', '__end__'

    class Node:
        def __init__(self, func, name, arg_names=None):
            self.func, self.name, self.arg_names = func, name, None
            self.inputs, self.outputs = dict(), []

        def __repr__(self): return lazyllm.make_repr('Flow', 'Node', name=self.name)

    def __init__(self, *, post_action=None, auto_capture=False, **kw):
        super(__class__, self).__init__(post_action=post_action, auto_capture=auto_capture, **kw)

    def __post_init__(self):
        self._nodes = {n: Graph.Node(f, n) for f, n in zip(self._items, self._item_names)}
        self._nodes[Graph.start_node_name] = Graph.Node(None, Graph.start_node_name)
        self._nodes[Graph.end_node_name] = Graph.Node(lazyllm.Identity(), Graph.end_node_name)
        self._in_degree = {node: 0 for node in self._nodes.values()}
        self._out_degree = {node: 0 for node in self._nodes.values()}
        self._sorted_nodes = None
        self._constants = []

    def set_node_arg_name(self, arg_names):
        """Set the argument names for nodes.

This method is used to set the names of function arguments for nodes in the graph, which is important for correct invocation of multi-parameter functions.

Args:
    arg_names (list): List of argument names, corresponding to the order when nodes were created.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda a, b: a + b
    ...     g.multiply = lambda x, y: x * y
    >>> g.set_node_arg_name([['x', 'y'], ['a', 'b']])
    >>> g._nodes['add'].arg_names
    ['x', 'y']
    >>> g._nodes['multiply'].arg_names
    ['a', 'b']
    """
        for node_name, name in zip(self._item_names, arg_names):
            self._nodes[node_name].arg_names = name

    @property
    def start_node(self):
        """Get the start node of the graph.

**Returns:**

- Node: The start node (__start__) object of the graph.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.process = lambda x: x * 2
    >>> start = g.start_node
    >>> start.name
    '__start__'
    """
        return self._nodes[Graph.start_node_name]

    @property
    def end_node(self):
        """Get the end node of the graph.

**Returns:**

- Node: The end node (__end__) object of the graph.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.process = lambda x: x * 2
    >>> end = g.end_node
    >>> end.name
    '__end__'
    """
        return self._nodes[Graph.end_node_name]

    def add_edge(self, from_node, to_node, formatter=None):
        """Add an edge to the graph, defining data flow between nodes.

This method is used to define connection relationships between nodes in the graph, specifying how data flows from one node to another.

Args:
    from_node (str or Node): The name or Node object of the source node.
    to_node (str or Node): The name or Node object of the target node.
    formatter (callable, optional): Optional formatting function for data transformation during transfer. Defaults to ``None``.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.node1 = lambda x: x * 2
    ...     g.node2 = lambda x: x + 1
    ...     g.node3 = lambda x, y: x + y
    >>> g.add_edge('__start__', 'node1')
    >>> g.add_edge('node1', 'node2')
    >>> g.add_edge('node3', '__end__')
    >>> g._nodes['node1'].outputs
    [<Flow type=Node name=node2>]
    >>> def double_input(data):
    ...     return data * 2
    >>> g.add_edge('node1', 'node3', formatter=double_input)
    >>> g._nodes['node3'].inputs
    {'node1': <function double_input at ...>}
    """
        if isinstance(from_node, (tuple, list)):
            return [self.add_edge(f, to_node, formatter) for f in from_node]
        if isinstance(to_node, (tuple, list)):
            return [self.add_edge(from_node, t, formatter) for t in to_node]

        if isinstance(from_node, str): from_node = self._nodes[from_node]
        if isinstance(to_node, str): to_node = self._nodes[to_node]
        from_node.outputs.append(to_node)
        assert from_node.name not in to_node.inputs, f'Duplicate edges from {from_node.name} to {to_node.name}'
        to_node.inputs[from_node.name] = formatter
        self._in_degree[to_node] += 1
        self._out_degree[from_node] += 1

    def add_const_edge(self, constant, to_node):
        """Add a constant edge that passes a fixed value to a specified node.

This method is used to pass constant values as input to nodes in the graph without needing to get data from other nodes.

Args:
    constant: The constant value to pass.
    to_node (str or Node): The name or Node object of the target node.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda x, y: x + y
    >>> g.add_const_edge(10, 'add')
    >>> g._constants
    [10]
    """
        if isinstance(to_node, (tuple, list)):
            return [self.add_const_edge(constant, t) for t in to_node]
        if isinstance(to_node, str): to_node = self._nodes[to_node]
        to_node.inputs[f'_lazyllm_constant_{len(self._constants)}'] = None
        self._constants.append(constant)

    def topological_sort(self):
        """Perform topological sorting to return the correct node execution order.

This method uses Kahn's algorithm to perform topological sorting on the directed acyclic graph, ensuring all dependencies are satisfied.

**Returns:**

- List[Node]: List of nodes arranged in topological order.

Raises:
- ValueError: If there are circular dependencies in the graph.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.node1 = lambda x: x * 2
    ...     g.node2 = lambda x: x + 1
    ...     g.node3 = lambda x, y: x + y
    >>> g.add_edge('__start__', 'node1')
    >>> g.add_edge('node1', 'node2')
    >>> g.add_edge('node1', 'node3')
    >>> g.add_edge('node2', 'node3')
    >>> g.add_edge('node3', '__end__')
    >>> sorted_nodes = g.topological_sort()
    >>> [node.name for node in sorted_nodes]
    ['__start__', 'node1', 'node2', 'node3', '__end__']
    >>> g.add_edge('node3', 'node1')
    >>> try:
    ...     g.topological_sort()
    ... except ValueError as e:
    ...     print("检测到循环依赖")
    检测到循环依赖
    """
        in_degree = self._in_degree.copy()
        queue = deque([node for node in self._nodes.values() if in_degree[node] == 0])
        sorted_nodes: List[Graph.Node] = []

        while queue:
            node = queue.popleft()
            sorted_nodes.append(node)
            for output_node in node.outputs:
                in_degree[output_node] -= 1
                if in_degree[output_node] == 0:
                    queue.append(output_node)

        if len(sorted_nodes) != len(self._nodes):
            raise ValueError('Graph has a cycle')

        return [n for n in sorted_nodes if (self._in_degree[n] > 0 or self._out_degree[n] > 0)]

    def _get_input(self, name, node, intermediate_results, futures):
        if name.startswith('_lazyllm_constant_'):
            return self._constants[int(name.removeprefix('_lazyllm_constant_'))]
        if name not in intermediate_results['values']:
            r = futures[name].result()
            with intermediate_results['lock']:
                if name not in intermediate_results['values']:
                    intermediate_results['values'][name] = r
        r = intermediate_results['values'][name]
        if isinstance(r, Exception): raise r
        if node.inputs[name]:
            if isinstance(r, arguments) and not ((len(r.args) == 0) ^ (len(r.kw) == 0)):
                raise RuntimeError('Only one of args and kwargs can be given with formatter.')
            r = node.inputs[name]((r.args or r.kw) if isinstance(r, arguments) else r)
        return r

    def compute_node(self, sid, node, intermediate_results, futures):
        """Compute the output result of a single node.

This is an internal method of the graph, used to execute the computation of a single node, including getting input data, applying formatter functions, calling node functions, etc.

Args:
    sid: Session ID.
    node (Node): The node to compute.
    intermediate_results (dict): Intermediate result storage.
    futures (dict): Async task dictionary.

**Returns:**

- The computation result of the node.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda x, y: x + y
    ...     g.multiply = lambda x: x * 2
    >>> g.add_edge('__start__', 'add')
    >>> g.add_const_edge(5, 'add')
    >>> g.add_edge('add', 'multiply')
    >>> g.add_edge('multiply', '__end__')
    >>> result = g(3)  # x=3, y=5 (常量)
    >>> result
    16
    """
        globals._init_sid(sid)

        kw = {}
        if len(node.inputs) == 1:
            input = self._get_input(list(node.inputs.keys())[0], node, intermediate_results, futures)
        else:
            # TODO(wangzhihong): add complex rules: mixture of package / support kwargs / ...
            inputs = package(self._get_input(input, node, intermediate_results, futures)
                             for input in node.inputs.keys())
            input = arguments()
            for inp in inputs:
                input.append(inp)

        if isinstance(input, arguments):
            kw = input.kw
            input = input.args

        if node.arg_names:
            if not isinstance(input, (list, tuple, package)): input = [input]
            kw.update({name: value for name, value in zip(node.arg_names, input)})
            input = package()

        return self.invoke(node.func, input, **kw)

    def _run(self, __input, **kw):
        if not self._sorted_nodes: self._sorted_nodes = self.topological_sort()
        intermediate_results = dict(lock=threading.Lock(), values={})

        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {}

            for node in self._sorted_nodes:
                if node.name == '__start__':
                    intermediate_results['values'][node.name] = (
                        arguments(__input, kw) if (__input and kw) else (kw or __input))
                else:
                    future = executor.submit(
                        copy_context().run,
                        partial(self.compute_node, globals._sid, node, intermediate_results, futures),
                    )
                    futures[node.name] = future

        return futures[Graph.end_node_name].result()

end_node property

Get the end node of the graph.

Returns:

  • Node: The end node (end) object of the graph.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.process = lambda x: x * 2
>>> end = g.end_node
>>> end.name
'__end__'

start_node property

Get the start node of the graph.

Returns:

  • Node: The start node (start) object of the graph.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.process = lambda x: x * 2
>>> start = g.start_node
>>> start.name
'__start__'

add_const_edge(constant, to_node)

Add a constant edge that passes a fixed value to a specified node.

This method is used to pass constant values as input to nodes in the graph without needing to get data from other nodes.

Parameters:

  • constant

    The constant value to pass.

  • to_node (str or Node) –

    The name or Node object of the target node.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.add = lambda x, y: x + y
>>> g.add_const_edge(10, 'add')
>>> g._constants
[10]
Source code in lazyllm/flow/flow.py
    def add_const_edge(self, constant, to_node):
        """Add a constant edge that passes a fixed value to a specified node.

This method is used to pass constant values as input to nodes in the graph without needing to get data from other nodes.

Args:
    constant: The constant value to pass.
    to_node (str or Node): The name or Node object of the target node.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda x, y: x + y
    >>> g.add_const_edge(10, 'add')
    >>> g._constants
    [10]
    """
        if isinstance(to_node, (tuple, list)):
            return [self.add_const_edge(constant, t) for t in to_node]
        if isinstance(to_node, str): to_node = self._nodes[to_node]
        to_node.inputs[f'_lazyllm_constant_{len(self._constants)}'] = None
        self._constants.append(constant)

add_edge(from_node, to_node, formatter=None)

Add an edge to the graph, defining data flow between nodes.

This method is used to define connection relationships between nodes in the graph, specifying how data flows from one node to another.

Parameters:

  • from_node (str or Node) –

    The name or Node object of the source node.

  • to_node (str or Node) –

    The name or Node object of the target node.

  • formatter (callable, default: None ) –

    Optional formatting function for data transformation during transfer. Defaults to None.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.node1 = lambda x: x * 2
...     g.node2 = lambda x: x + 1
...     g.node3 = lambda x, y: x + y
>>> g.add_edge('__start__', 'node1')
>>> g.add_edge('node1', 'node2')
>>> g.add_edge('node3', '__end__')
>>> g._nodes['node1'].outputs
[<Flow type=Node name=node2>]
>>> def double_input(data):
...     return data * 2
>>> g.add_edge('node1', 'node3', formatter=double_input)
>>> g._nodes['node3'].inputs
{'node1': <function double_input at ...>}
Source code in lazyllm/flow/flow.py
    def add_edge(self, from_node, to_node, formatter=None):
        """Add an edge to the graph, defining data flow between nodes.

This method is used to define connection relationships between nodes in the graph, specifying how data flows from one node to another.

Args:
    from_node (str or Node): The name or Node object of the source node.
    to_node (str or Node): The name or Node object of the target node.
    formatter (callable, optional): Optional formatting function for data transformation during transfer. Defaults to ``None``.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.node1 = lambda x: x * 2
    ...     g.node2 = lambda x: x + 1
    ...     g.node3 = lambda x, y: x + y
    >>> g.add_edge('__start__', 'node1')
    >>> g.add_edge('node1', 'node2')
    >>> g.add_edge('node3', '__end__')
    >>> g._nodes['node1'].outputs
    [<Flow type=Node name=node2>]
    >>> def double_input(data):
    ...     return data * 2
    >>> g.add_edge('node1', 'node3', formatter=double_input)
    >>> g._nodes['node3'].inputs
    {'node1': <function double_input at ...>}
    """
        if isinstance(from_node, (tuple, list)):
            return [self.add_edge(f, to_node, formatter) for f in from_node]
        if isinstance(to_node, (tuple, list)):
            return [self.add_edge(from_node, t, formatter) for t in to_node]

        if isinstance(from_node, str): from_node = self._nodes[from_node]
        if isinstance(to_node, str): to_node = self._nodes[to_node]
        from_node.outputs.append(to_node)
        assert from_node.name not in to_node.inputs, f'Duplicate edges from {from_node.name} to {to_node.name}'
        to_node.inputs[from_node.name] = formatter
        self._in_degree[to_node] += 1
        self._out_degree[from_node] += 1

compute_node(sid, node, intermediate_results, futures)

Compute the output result of a single node.

This is an internal method of the graph, used to execute the computation of a single node, including getting input data, applying formatter functions, calling node functions, etc.

Parameters:

  • sid

    Session ID.

  • node (Node) –

    The node to compute.

  • intermediate_results (dict) –

    Intermediate result storage.

  • futures (dict) –

    Async task dictionary.

Returns:

  • The computation result of the node.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.add = lambda x, y: x + y
...     g.multiply = lambda x: x * 2
>>> g.add_edge('__start__', 'add')
>>> g.add_const_edge(5, 'add')
>>> g.add_edge('add', 'multiply')
>>> g.add_edge('multiply', '__end__')
>>> result = g(3)  # x=3, y=5 (常量)
>>> result
16
Source code in lazyllm/flow/flow.py
    def compute_node(self, sid, node, intermediate_results, futures):
        """Compute the output result of a single node.

This is an internal method of the graph, used to execute the computation of a single node, including getting input data, applying formatter functions, calling node functions, etc.

Args:
    sid: Session ID.
    node (Node): The node to compute.
    intermediate_results (dict): Intermediate result storage.
    futures (dict): Async task dictionary.

**Returns:**

- The computation result of the node.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda x, y: x + y
    ...     g.multiply = lambda x: x * 2
    >>> g.add_edge('__start__', 'add')
    >>> g.add_const_edge(5, 'add')
    >>> g.add_edge('add', 'multiply')
    >>> g.add_edge('multiply', '__end__')
    >>> result = g(3)  # x=3, y=5 (常量)
    >>> result
    16
    """
        globals._init_sid(sid)

        kw = {}
        if len(node.inputs) == 1:
            input = self._get_input(list(node.inputs.keys())[0], node, intermediate_results, futures)
        else:
            # TODO(wangzhihong): add complex rules: mixture of package / support kwargs / ...
            inputs = package(self._get_input(input, node, intermediate_results, futures)
                             for input in node.inputs.keys())
            input = arguments()
            for inp in inputs:
                input.append(inp)

        if isinstance(input, arguments):
            kw = input.kw
            input = input.args

        if node.arg_names:
            if not isinstance(input, (list, tuple, package)): input = [input]
            kw.update({name: value for name, value in zip(node.arg_names, input)})
            input = package()

        return self.invoke(node.func, input, **kw)

set_node_arg_name(arg_names)

Set the argument names for nodes.

This method is used to set the names of function arguments for nodes in the graph, which is important for correct invocation of multi-parameter functions.

Parameters:

  • arg_names (list) –

    List of argument names, corresponding to the order when nodes were created.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.add = lambda a, b: a + b
...     g.multiply = lambda x, y: x * y
>>> g.set_node_arg_name([['x', 'y'], ['a', 'b']])
>>> g._nodes['add'].arg_names
['x', 'y']
>>> g._nodes['multiply'].arg_names
['a', 'b']
Source code in lazyllm/flow/flow.py
    def set_node_arg_name(self, arg_names):
        """Set the argument names for nodes.

This method is used to set the names of function arguments for nodes in the graph, which is important for correct invocation of multi-parameter functions.

Args:
    arg_names (list): List of argument names, corresponding to the order when nodes were created.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.add = lambda a, b: a + b
    ...     g.multiply = lambda x, y: x * y
    >>> g.set_node_arg_name([['x', 'y'], ['a', 'b']])
    >>> g._nodes['add'].arg_names
    ['x', 'y']
    >>> g._nodes['multiply'].arg_names
    ['a', 'b']
    """
        for node_name, name in zip(self._item_names, arg_names):
            self._nodes[node_name].arg_names = name

topological_sort()

Perform topological sorting to return the correct node execution order.

This method uses Kahn's algorithm to perform topological sorting on the directed acyclic graph, ensuring all dependencies are satisfied.

Returns:

  • List[Node]: List of nodes arranged in topological order.

Raises: - ValueError: If there are circular dependencies in the graph.

Examples:

>>> import lazyllm
>>> with lazyllm.graph() as g:
...     g.node1 = lambda x: x * 2
...     g.node2 = lambda x: x + 1
...     g.node3 = lambda x, y: x + y
>>> g.add_edge('__start__', 'node1')
>>> g.add_edge('node1', 'node2')
>>> g.add_edge('node1', 'node3')
>>> g.add_edge('node2', 'node3')
>>> g.add_edge('node3', '__end__')
>>> sorted_nodes = g.topological_sort()
>>> [node.name for node in sorted_nodes]
['__start__', 'node1', 'node2', 'node3', '__end__']
>>> g.add_edge('node3', 'node1')
>>> try:
...     g.topological_sort()
... except ValueError as e:
...     print("检测到循环依赖")
检测到循环依赖
Source code in lazyllm/flow/flow.py
    def topological_sort(self):
        """Perform topological sorting to return the correct node execution order.

This method uses Kahn's algorithm to perform topological sorting on the directed acyclic graph, ensuring all dependencies are satisfied.

**Returns:**

- List[Node]: List of nodes arranged in topological order.

Raises:
- ValueError: If there are circular dependencies in the graph.


Examples:
    >>> import lazyllm
    >>> with lazyllm.graph() as g:
    ...     g.node1 = lambda x: x * 2
    ...     g.node2 = lambda x: x + 1
    ...     g.node3 = lambda x, y: x + y
    >>> g.add_edge('__start__', 'node1')
    >>> g.add_edge('node1', 'node2')
    >>> g.add_edge('node1', 'node3')
    >>> g.add_edge('node2', 'node3')
    >>> g.add_edge('node3', '__end__')
    >>> sorted_nodes = g.topological_sort()
    >>> [node.name for node in sorted_nodes]
    ['__start__', 'node1', 'node2', 'node3', '__end__']
    >>> g.add_edge('node3', 'node1')
    >>> try:
    ...     g.topological_sort()
    ... except ValueError as e:
    ...     print("检测到循环依赖")
    检测到循环依赖
    """
        in_degree = self._in_degree.copy()
        queue = deque([node for node in self._nodes.values() if in_degree[node] == 0])
        sorted_nodes: List[Graph.Node] = []

        while queue:
            node = queue.popleft()
            sorted_nodes.append(node)
            for output_node in node.outputs:
                in_degree[output_node] -= 1
                if in_degree[output_node] == 0:
                    queue.append(output_node)

        if len(sorted_nodes) != len(self._nodes):
            raise ValueError('Graph has a cycle')

        return [n for n in sorted_nodes if (self._in_degree[n] > 0 or self._out_degree[n] > 0)]