Skip to content

Common

Register

lazyllm.common.Register

Bases: object

LazyLLM provides a registration mechanism for Components, allowing any function to be registered as a Component of LazyLLM. The registered functions can be indexed at any location through the grouping mechanism provided by the registrar, without the need for explicit import.

lazyllm.components.register(cls, *, rewrite_func)→ Decorator

After the function is called, it returns a decorator which wraps the decorated function into a Component and registers it in a group named cls.

Parameters:

  • base (type) –

    Base class

  • fnames (Union[str, List[str]]) –

    Function name or function name list to rewrite

  • template (str, default: None ) –

    Registration template string, defaults to standard registration template

  • default_group (str, default: None ) –

    Default group name, defaults to None

Examples:

>>> import lazyllm
>>> @lazyllm.component_register('mygroup')
... def myfunc(input):
...    return input
...
>>> lazyllm.mygroup.myfunc()(1)
1
>>> @lazyllm.component_register.cmd('mygroup')
... def mycmdfunc(input):
...     return f'echo {input}'
...
>>> lazyllm.mygroup.mycmdfunc()(1)
PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) Command: echo 1
PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) PID: 1
Source code in lazyllm/common/registry.py
class Register(object):
    """LazyLLM provides a registration mechanism for Components, allowing any function to be registered as a Component of LazyLLM. The registered functions can be indexed at any location through the grouping mechanism provided by the registrar, without the need for explicit import.

<span style="font-size: 18px;">&ensp;**`lazyllm.components.register(cls, *, rewrite_func)→ Decorator`**</span>

After the function is called, it returns a decorator which wraps the decorated function into a Component and registers it in a group named cls.

Args:
    base (type): Base class
    fnames (Union[str, List[str]]): Function name or function name list to rewrite
    template (str, optional): Registration template string, defaults to standard registration template
    default_group (str, optional): Default group name, defaults to None


Examples:
    >>> import lazyllm
    >>> @lazyllm.component_register('mygroup')
    ... def myfunc(input):
    ...    return input
    ...
    >>> lazyllm.mygroup.myfunc()(1)
    1
    >>> @lazyllm.component_register.cmd('mygroup')
    ... def mycmdfunc(input):
    ...     return f'echo {input}'
    ...
    >>> lazyllm.mygroup.mycmdfunc()(1)
    PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) Command: echo 1
    PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) PID: 1
    """
    def __init__(self, base, fnames, template: Optional[str] = None, default_group: Optional[str] = None,
                 allowed_parameter: Optional[Union[str, List[str]]] = None):
        if template is not None:
            warnings.warn('The "template" parameter of Register is deprecated and ignored.',
                          DeprecationWarning, stacklevel=2)
        self.basecls = base
        self.fnames = [fnames] if isinstance(fnames, str) else fnames
        self._default_group = default_group
        if isinstance(allowed_parameter, str):
            self._allowed_parameter = {allowed_parameter}
        elif isinstance(allowed_parameter, list):
            assert all(isinstance(p, str) for p in allowed_parameter), 'allowed_parameter must be list of str'
            self._allowed_parameter = set(allowed_parameter)
        elif allowed_parameter is None:
            self._allowed_parameter = set()
        else:
            raise TypeError('allowed_parameter must be str or list[str]')
        assert len(self.fnames) > 0, 'At least one function should be given for overwrite.'

    def _wrap(self, cls, *, rewrite_func=None, **kwargs):
        cls = cls.__name__ if isinstance(cls, type) else cls
        cls = re.match('(LazyLLM)(.*)(Base)', cls.split('.')[-1])[2] \
            if (cls.startswith('LazyLLM') and cls.endswith('Base')) else cls
        base = _get_base_cls_from_registry(cls.lower())
        assert issubclass(base, self.basecls)
        if rewrite_func is None:
            rewrite_func = base.__reg_overwrite__ if getattr(base, '__reg_overwrite__', None) else self.fnames[0]
        assert rewrite_func in self.fnames, f'Invalid function "{rewrite_func}" provived for rewrite.'

        def impl(func, func_name=None):
            if func_name:
                func_for_wrapper = func  # avoid calling recursively

                @functools.wraps(func)
                def wrapper_func(*args, **kwargs):
                    return func_for_wrapper(*args, **kwargs)

                wrapper_func.__name__ = func_name
                func = wrapper_func
            else:
                func_name = func.__name__
            base_cls = LazyLLMRegisterMetaClass.all_clses[cls.lower()].base
            type(func_name + cls.split('.')[-1].capitalize(), (base_cls,), {})
            f = LazyLLMRegisterMetaClass.all_clses[cls.lower()].__getattr__(func_name)

            # Support multiprocessing: Register the class in the module where the function is defined
            if func.__module__ in sys.modules:
                setattr(sys.modules[func.__module__], f.__name__, f)
                f.__module__ = func.__module__

            f.__name__ = func_name
            setattr(f, rewrite_func, bind_to_instance(func))
            for k, v in kwargs.items():
                setattr(f, k, v)
            return func
        return impl

    def __call__(self, f, *, rewrite_func=None, **kwargs):
        if not isinstance(f, (str, type)):
            assert self._default_group, 'default_group is not set, please set it by your register decorator'
            return self._wrap(self._default_group)(f)
        assert all(k in self._allowed_parameter for k in kwargs.keys()), \
            f'Only allowed parameters: {self._allowed_parameter}, but got {kwargs.keys()}'
        return self._wrap(f, rewrite_func=rewrite_func, **kwargs)

    def __getattr__(self, name):
        if name not in self.fnames:
            raise AttributeError(f'class {self.__class__} has no attribute {name}')

        def impl(cls):
            return self(cls, rewrite_func=name)
        return impl

    def new_group(self, group_name):
        """
Creates a new ComponentGroup. The newly created group will be automatically added to __builtin__ and can be accessed at any location without the need for import.

Args:
    group_name (str): The name of the group to be created.
"""
        return type(f'LazyLLM{group_name}Base', (self.basecls,), {})

new_group(group_name)

Creates a new ComponentGroup. The newly created group will be automatically added to builtin and can be accessed at any location without the need for import.

Parameters:

  • group_name (str) –

    The name of the group to be created.

Source code in lazyllm/common/registry.py
    def new_group(self, group_name):
        """
Creates a new ComponentGroup. The newly created group will be automatically added to __builtin__ and can be accessed at any location without the need for import.

Args:
    group_name (str): The name of the group to be created.
"""
        return type(f'LazyLLM{group_name}Base', (self.basecls,), {})

lazyllm.common.registry.LazyDict

Bases: dict

A special dictionary class designed for lazy programmers. Supports various convenient access and operation methods.

Features:

  1. Use dot notation instead of ['str'] to access dictionary elements
  2. Support lowercase first character to make statements more like function calls
  3. Support direct calls when dictionary has only one element
  4. Support dynamic default keys
  5. Allow omitting group name if it appears in the name

Parameters:

  • name (str, default: '' ) –

    Name of the dictionary, defaults to empty string.

  • base

    Base class reference, defaults to None.

  • *args

    Positional arguments passed to dict parent class.

  • **kw

    Keyword arguments passed to dict parent class.

Source code in lazyllm/common/registry.py
class LazyDict(dict):
    """A special dictionary class designed for lazy programmers. Supports various convenient access and operation methods.

Features:

1. Use dot notation instead of ['str'] to access dictionary elements
2. Support lowercase first character to make statements more like function calls
3. Support direct calls when dictionary has only one element
4. Support dynamic default keys
5. Allow omitting group name if it appears in the name

Args:
    name (str): Name of the dictionary, defaults to empty string.
    base: Base class reference, defaults to None.
    *args: Positional arguments passed to dict parent class.
    **kw: Keyword arguments passed to dict parent class.
"""
    def __init__(self, name='', base=None, *args, **kw):
        super(__class__, self).__init__(*args, **kw)
        self._default: Optional[str] = None
        self.name = name.capitalize()
        self.base = base

    def __setitem__(self, key, value):
        key = key.lower()
        assert key != 'default', 'LazyDict do not support key: default'
        if '.' in key:
            grp, key = key.rsplit('.', 1)
            return self[grp].__setitem__(key, value)
        return super().__setitem__(key, value)

    def __getitem__(self, key):
        key = key.lower()
        if '.' in key:
            grp, key = key.split('.', 1)
            return self[grp][key]
        return super().__getitem__(key)

    # default -> self.default
    # key -> Key, keyName, KeyName
    # if self.name ends with 's' or 'es', ignor it
    def _match(self, key: str):
        key = key.lower()
        if key == 'default':
            assert self._default or len(self) > 0, 'No default key set'
            key = self._default or list(self.keys())[0]
        keys = [key, f'{key}{self.name}', f'{key}{self.name.lower()}']
        if self.name.endswith('s'):
            n = 2 if self.name.endswith('es') else 1
            keys.extend([f'{key}{self.name[:-n]}', f'{key}{self.name[:-n].lower()}'])

        for k in set(keys):
            if k in self.keys():
                return k
        raise AttributeError(f'Attr {key} not found in `{self.name}: {self}`, conditates: {keys}')

    def __getattr__(self, key):
        return self[self._match(key)]

    def remove(self, key):
        """Remove the specified key-value pair from the dictionary.

Args:
    key (str): The key to remove. Supports the same key matching rules as __getattr__, 
              including lowercase first character and group name omission features.

Note:
    Raises AttributeError if no matching key is found.
"""
        super(__class__, self).pop(self._match(key))

    def __call__(self, *args, **kwargs):
        assert self._default is not None or len(self.keys()) == 1
        return (self.default if self._default else self[list(self.keys())[0]])(*args, **kwargs)

    def set_default(self, key: str):
        """Set the default key for the dictionary. After setting, the value can be accessed through the .default property.

Args:
    key (str): The key name to set as default.

Note:
    - key must be a string type
    - After setting, can be accessed via .default, or called directly when dictionary has only one element
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

    def __contains__(self, key):
        try:
            _ = self[self._match(key)]
            return True
        except (AttributeError, KeyError):
            return False

remove(key)

Remove the specified key-value pair from the dictionary.

Parameters:

  • key (str) –

    The key to remove. Supports the same key matching rules as getattr, including lowercase first character and group name omission features.

Note

Raises AttributeError if no matching key is found.

Source code in lazyllm/common/registry.py
    def remove(self, key):
        """Remove the specified key-value pair from the dictionary.

Args:
    key (str): The key to remove. Supports the same key matching rules as __getattr__, 
              including lowercase first character and group name omission features.

Note:
    Raises AttributeError if no matching key is found.
"""
        super(__class__, self).pop(self._match(key))

set_default(key)

Set the default key for the dictionary. After setting, the value can be accessed through the .default property.

Parameters:

  • key (str) –

    The key name to set as default.

Note
  • key must be a string type
  • After setting, can be accessed via .default, or called directly when dictionary has only one element
Source code in lazyllm/common/registry.py
    def set_default(self, key: str):
        """Set the default key for the dictionary. After setting, the value can be accessed through the .default property.

Args:
    key (str): The key name to set as default.

Note:
    - key must be a string type
    - After setting, can be accessed via .default, or called directly when dictionary has only one element
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

lazyllm.common.common.ResultCollector

Bases: object

A result collector used to store and access results by name during the execution of a flow or task.
Calling the instance with a name returns a callable Impl object that collects results for that name.
Useful for scenarios where intermediate results need to be shared across steps.

Source code in lazyllm/common/common.py
class ResultCollector(object):
    """A result collector used to store and access results by name during the execution of a flow or task.  
Calling the instance with a name returns a callable Impl object that collects results for that name.  
Useful for scenarios where intermediate results need to be shared across steps.
"""
    class Impl(object):
        def __init__(self, name, value): self._name, self._value = name, value

        def __call__(self, *args, **kw):
            assert (len(args) == 0) ^ (len(kw) == 0), f'args({len(args)}), kwargs({len(kw)})'
            assert self._name is not None
            if len(args) > 0:
                self._value[self._name] = args[0] if len(args) == 1 else package(*args)
                return self._value[self._name]
            else:
                self._value[self._name] = kw
                return kwargs(kw)

    def __init__(self): self._value = dict()
    def __call__(self, name): return ResultCollector.Impl(name, self._value)
    def __getitem__(self, name): return self._value[name]
    def __repr__(self): return repr(self._value)
    def keys(self):
        """Get all stored result names.

**Returns:**

- KeysView[str]: A set-like object containing result names.
"""
        return self._value.keys()
    def items(self):
        """Get all stored (name, value) pairs.

**Returns:**

- ItemsView[str, Any]: A set-like object containing name-value pairs of results.
"""
        return self._value.items()

items()

Get all stored (name, value) pairs.

Returns:

  • ItemsView[str, Any]: A set-like object containing name-value pairs of results.
Source code in lazyllm/common/common.py
    def items(self):
        """Get all stored (name, value) pairs.

**Returns:**

- ItemsView[str, Any]: A set-like object containing name-value pairs of results.
"""
        return self._value.items()

keys()

Get all stored result names.

Returns:

  • KeysView[str]: A set-like object containing result names.
Source code in lazyllm/common/common.py
    def keys(self):
        """Get all stored result names.

**Returns:**

- KeysView[str]: A set-like object containing result names.
"""
        return self._value.keys()

lazyllm.common.common.EnvVarContextManager

Environment variable context manager used to temporarily set environment variables during the execution of a code block, automatically restoring original environment variables upon exit.

Parameters:

  • env_vars_dict (dict) –

    Dictionary of environment variables to temporarily set; variables with None values are ignored.

Source code in lazyllm/common/common.py
class EnvVarContextManager:
    """Environment variable context manager used to temporarily set environment variables during the execution of a code block, automatically restoring original environment variables upon exit.

Args:
    env_vars_dict (dict): Dictionary of environment variables to temporarily set; variables with None values are ignored.
"""
    def __init__(self, env_vars_dict):
        self.env_vars_dict = {var: value for var, value in env_vars_dict.items() if value is not None}
        self.original_values = {}

    def __enter__(self):
        for var, value in self.env_vars_dict.items():
            if var in os.environ:
                self.original_values[var] = os.environ[var]
            os.environ[var] = value
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for var in self.env_vars_dict:
            if var in self.original_values:
                os.environ[var] = self.original_values[var]
            else:
                del os.environ[var]

Bind

lazyllm.common.bind

Bind

Bases: object

The Bind class provides function binding and deferred invocation capabilities, supporting dynamic argument passing and context-based argument resolution for flexible function composition and pipeline-style calls.

The bind function binds a callable with fixed positional and keyword arguments, supporting placeholders (e.g. _0, _1) to reference outputs of upstream nodes within the current pipeline, enabling flexible data jumps and function composition.

Notes
  • Bound arguments can be concrete values or placeholders referring to upstream pipeline outputs.
  • Bindings are local to the current pipeline context and do not support cross-pipeline or external variable binding.

Parameters:

  • __bind_func (Callable or type, default: _None ) –

    The function or function type to bind. If a type is given, it will be instantiated automatically.

  • *args

    Fixed positional arguments to bind, supporting placeholders.

  • **kw

    Fixed keyword arguments to bind, supporting placeholders.

Examples:

>>> from lazyllm import bind, _0, _1
>>> def f1(x):
...     return x ** 2
>>> def f21(input1, input2=0):
...     return input1 + input2 + 1
>>> def f22(input1, input2=0):
...     return input1 + input2 - 1
>>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"
>>> from lazyllm import pipeline, parallel
>>> with pipeline() as ppl:
...     ppl.f1 = f1
...     with parallel() as ppl.subprl2:
...         ppl.subprl2.path1 = f21
...         ppl.subprl2.path2 = f22
...     ppl.f3 = bind(f3, ppl.input, _0, _1)
...
>>> print(ppl(2))
get [input:2], [f21:5], [f22:3]
>>> # Demonstrate operator '|' overloading for bind
>>> with pipeline() as ppl2:
...     ppl2.f1 = f1
...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
...         ppl2.subprl2.path1 = f21
...         ppl2.subprl2.path2 = f22
...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
...
>>> print(ppl2(2))
get [input:2], [f21:7], [f22:5]
Source code in lazyllm/common/bind.py
class Bind(object):
    """The Bind class provides function binding and deferred invocation capabilities, supporting dynamic argument passing and context-based argument resolution for flexible function composition and pipeline-style calls.

The bind function binds a callable with fixed positional and keyword arguments, supporting placeholders (e.g. _0, _1) to reference outputs of upstream nodes within the current pipeline, enabling flexible data jumps and function composition.

Notes:
    - Bound arguments can be concrete values or placeholders referring to upstream pipeline outputs.
    - Bindings are local to the current pipeline context and do not support cross-pipeline or external variable binding.

Args:
    __bind_func (Callable or type): The function or function type to bind. If a type is given, it will be instantiated automatically.
    *args: Fixed positional arguments to bind, supporting placeholders.
    **kw: Fixed keyword arguments to bind, supporting placeholders.


Examples:
    >>> from lazyllm import bind, _0, _1
    >>> def f1(x):
    ...     return x ** 2
    >>> def f21(input1, input2=0):
    ...     return input1 + input2 + 1
    >>> def f22(input1, input2=0):
    ...     return input1 + input2 - 1
    >>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
    ...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"

    >>> from lazyllm import pipeline, parallel

    >>> with pipeline() as ppl:
    ...     ppl.f1 = f1
    ...     with parallel() as ppl.subprl2:
    ...         ppl.subprl2.path1 = f21
    ...         ppl.subprl2.path2 = f22
    ...     ppl.f3 = bind(f3, ppl.input, _0, _1)
    ...
    >>> print(ppl(2))
    get [input:2], [f21:5], [f22:3]

    >>> # Demonstrate operator '|' overloading for bind
    >>> with pipeline() as ppl2:
    ...     ppl2.f1 = f1
    ...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
    ...         ppl2.subprl2.path1 = f21
    ...         ppl2.subprl2.path2 = f22
    ...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
    ...
    >>> print(ppl2(2))
    get [input:2], [f21:7], [f22:5]
    """
    class _None: pass

    class Args(object):
        class _None: pass
        class Unpack(package): pass

        def __init__(self, source_id: str, target_id: str = 'input', *, unpack: bool = False):
            self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
            self._source_id, self._target_id = source_id, target_id
            self._unpack = unpack

        def __getitem__(self, key: str):
            self._item_key = key
            return self

        def __getattr__(self, key: str):
            if key.startswith('__') and key.endswith('__'):
                raise AttributeError(f'Args has no attribute {key}')
            self._attr_key = key
            return self

        def __getstate__(self):
            return self._item_key, self._attr_key, self._source_id, self._target_id

        def __setstate__(self, state):
            self._item_key, self._attr_key, self._source_id, self._target_id = state

        def get_arg(self, source):
            if not source or source['source'] != self._source_id:
                if self._source_id in locals['bind_args']:
                    source = locals['bind_args'][self._source_id]
                    if not source or source['source'] != self._source_id:
                        LOG.error(f'Get source failed, source is {source}, and expect id is {self._source_id}')
                        raise RuntimeError('Internal Error, please report issue to `https://github.com/LazyAGI/LazyLLM`')
                else:
                    LOG.error(f'Get source failed, locals is {locals["bind_args"]} with id {locals._sid}, '
                              f'and expect id is {self._source_id}')
                    raise RuntimeError('Unable to find the bound parameter, possibly due to pipeline.input/output can '
                                       'only be bind in direct member of pipeline! You may solve this by defining the '
                                       'pipeline in a `with lazyllm.save_pipeline_result():` block.')
            input = result = source[self._target_id]
            source = source['source']
            if self._item_key is not Bind.Args._None: result = input[self._item_key]
            elif self._attr_key is not Bind.Args._None: result = getattr(input, self._attr_key)
            if self._unpack and isinstance(result, package): result = Bind.Args.Unpack(result)
            return result

        def __repr__(self):
            return f'<class \'lazyllm.common.bind.Bind.Args\' source={self._source_id}>'

    def __init__(self, __bind_func=_None, *args, **kw):
        object.__setattr__(self, '_hooks', [])
        self._f = __bind_func() if isinstance(__bind_func, type) and __bind_func is not Bind._None else __bind_func
        self._args = args
        self._kw = kw

    def _wraps_plain_callable(self) -> bool:
        inner = getattr(self, '_f', None)
        if inner is None or not callable(inner):
            return False
        return not (hasattr(inner, '_flow_id') or hasattr(inner, '_module_id'))

    def __or__(self, other):
        if isinstance(other, Bind):
            return other.__ror__(self)
        return NotImplementedError('Only support `binded-func | bind()` syntax!')

    def __ror__(self, __value: Callable):
        if self._f is not Bind._None: self._args = (self._f,) + self._args
        self._f = __value
        return self

    # _bind_args_source: dict(input=input, args=dict(key=value))
    def __call__(self, *args, _bind_args_source=None, **kw):
        if self._f is None: return None
        keys = set(kw.keys()).intersection(set(self._kw.keys()))
        assert len(keys) == 0, f'Keys `{keys}` are already bind!'
        bind_args = args if len(self._args) == 0 else (
            [args[a.idx] if isinstance(a, Placeholder) else a for a in self._args])
        kwargs = {k: args[v.idx] if isinstance(v, Placeholder) else v for k, v in self._kw.items()}
        bind_args = [a.get_arg(_bind_args_source) if isinstance(a, Bind.Args) else a for a in bind_args]
        bind_args = list(itertools.chain.from_iterable(x if isinstance(x, Bind.Args.Unpack) else [x] for x in bind_args))
        kwargs = {k: v.get_arg(_bind_args_source) if isinstance(v, Bind.Args) else v for k, v in kwargs.items()}
        if self._hooks:
            from lazyllm.hook import execution_with_hooks
            merged = {**kwargs, **kw}
            return execution_with_hooks(self, *args, **merged)(self._f)(*bind_args, **merged)
        return self._f(*bind_args, **kwargs, **kw)

    # TODO: modify it
    def __repr__(self) -> str:
        return self._f.__repr__() + '(bind args: {})'.format(
            ', '.join([repr(a) if a is not self else 'self' for a in self._args] + [
                f'{k}={repr(v) if v is not self else "self"}' for k, v in self._kw.items()]))

    def __getattr__(self, name):
        # name will be '_f' in copy.deepcopy
        if name != '_f':
            return getattr(self._f, name)
        return super(__class__, self).__getattr__(name)

    def __setattr__(self, __name: str, __value: Any) -> None:
        if __name not in ('_f', '_args', '_kw', '_has_root', '_hooks'):
            return setattr(self._f, __name, __value)
        return super(__class__, self).__setattr__(__name, __value)

    def __eq__(self, value):
        return value == self._f

    def bind(self, *args, **kw):
        return Bind(self, *args, **kw)

Package

lazyllm.common.package

Bases: tuple

The package class is used to encapsulate the return values of pipeline or parallel modules, ensuring automatic unpacking when passing to the next module, thereby supporting flexible multi-value passing.

Examples:

>>> from lazyllm.common import package
>>> p = package(1, 2, 3)
>>> p
(1, 2, 3)
>>> p[1]
2
>>> p_slice = p[1:]
>>> isinstance(p_slice, package)
True
>>> p2 = package([4, 5])
>>> p + p2
(1, 2, 3, 4, 5)
Source code in lazyllm/common/common.py
class package(tuple):
    """The package class is used to encapsulate the return values of pipeline or parallel modules,
ensuring automatic unpacking when passing to the next module, thereby supporting flexible multi-value passing.


Examples:
    >>> from lazyllm.common import package
    >>> p = package(1, 2, 3)
    >>> p
    (1, 2, 3)
    >>> p[1]
    2
    >>> p_slice = p[1:]
    >>> isinstance(p_slice, package)
    True
    >>> p2 = package([4, 5])
    >>> p + p2
    (1, 2, 3, 4, 5)
    """
    def __new__(cls, *args):
        if len(args) == 1 and isinstance(args[0], (tuple, list, types.GeneratorType)):
            return super(__class__, cls).__new__(cls, args[0])
        else:
            return super(__class__, cls).__new__(cls, args)

    def __getitem__(self, key):
        if isinstance(key, slice):
            return package(super(__class__, self).__getitem__(key))
        return super(__class__, self).__getitem__(key)

    def __add__(self, __other):
        return package(super().__add__(__other))

Identity

lazyllm.common.Identity

Identity module that directly returns the input as output.

This module serves as a no-op placeholder in composition pipelines. If multiple inputs are provided, they are packed together before returning.

Parameters:

  • *args

    Optional positional arguments for placeholder compatibility.

  • **kw

    Optional keyword arguments for placeholder compatibility.

Source code in lazyllm/common/common.py
class Identity():
    """
Identity module that directly returns the input as output.

This module serves as a no-op placeholder in composition pipelines. If multiple inputs are provided, they are packed together before returning.

Args:
    *args: Optional positional arguments for placeholder compatibility.
    **kw: Optional keyword arguments for placeholder compatibility.
"""
    def __init__(self, *args, **kw):
        pass

    def __call__(self, *inputs):
        if len(inputs) == 1:
            return inputs[0]
        return package(*inputs)

    def __repr__(self):
        return make_repr('Module', 'Identity')

Compilation

lazyllm.common.compile_func(func_code, global_env=None)

Compile a Python function string into an executable function and return it.

Parameters:

  • func_code (str) –

    A string containing Python function code

  • global_env (str, default: None ) –

    Packages and global variables used in the Python function

Examples:

from lazyllm.common import compile_func
code_str = 'def Identity(v): return v'
identity = compile_func(code_str)
assert identity('hello') == 'hello'
Source code in lazyllm/common/utils.py
def compile_func(func_code: str, global_env: Optional[Dict[str, Any]] = None) -> Callable:
    """
Compile a Python function string into an executable function and return it.

Args:
    func_code (str): A string containing Python function code
    global_env (str): Packages and global variables used in the Python function


Examples:

    from lazyllm.common import compile_func
    code_str = 'def Identity(v): return v'
    identity = compile_func(code_str)
    assert identity('hello') == 'hello'
    """
    fname = re.search(r'def\s+(\w+)\s*\(', func_code).group(1)
    module = ast.parse(func_code)
    SecurityVisitor().visit(module)
    func = compile(module, filename='<ast>', mode='exec')
    local_dict = {}
    exec(func, global_env if global_env is not None else local_dict, local_dict)
    return local_dict.pop(fname)

Queue

lazyllm.common.FileSystemQueue

Bases: ABC

Abstract base class for file system-based queues.

FileSystemQueue is an abstract base class that provides a file system-based queue operation interface. It supports multiple backend implementations (such as SQLite, Redis) for message passing and data flow control in distributed environments.

This class implements the singleton pattern, ensuring only one queue instance per class name, and provides thread-safe queue operations.

Parameters:

  • klass (str, default: '__default__' ) –

    Class name identifier for the queue. Defaults to '__default__'.

Returns:

  • FileSystemQueue: Queue instance (singleton pattern)
Source code in lazyllm/common/queue.py
class FileSystemQueue(ABC):
    """Abstract base class for file system-based queues.

FileSystemQueue is an abstract base class that provides a file system-based queue operation interface. It supports multiple backend implementations (such as SQLite, Redis) for message passing and data flow control in distributed environments.

This class implements the singleton pattern, ensuring only one queue instance per class name, and provides thread-safe queue operations.

Args:
    klass (str, optional): Class name identifier for the queue. Defaults to ``'__default__'``.

**Returns:**

- FileSystemQueue: Queue instance (singleton pattern)
"""

    __queue_pool__ = dict()
    __queue_pool_lock__ = threading.RLock()

    def __init__(self, *, klass='__default__'):
        super().__init__()
        self._class = klass

    def __new__(cls, *args, **kw):
        klass = kw.get('klass', '__default__')
        if klass not in __class__.__queue_pool__:
            with __class__.__queue_pool_lock__:
                if klass not in __class__.__queue_pool__:
                    if cls is __class__:
                        __class__.__queue_pool__[klass] = cls.__default_queue__(*args, **kw)
                    else:
                        __class__.__queue_pool__[klass] = super().__new__(cls)
        return __class__.__queue_pool__[klass]

    @classmethod
    def get_instance(cls, klass):
        """Get the queue instance for the specified class name.

This method returns the queue object bound to the given class name. If the class name has not been registered, it will be initialized automatically.

Args:
    klass (str): Queue class name identifier, must not be ``'__default__'``.

**Returns:**

- FileSystemQueue: Queue instance bound to the specified class name.
"""
        assert isinstance(klass, str) and klass != '__default__'
        return cls(klass=klass)

    @classmethod
    def set_default(cls, queue: Type):
        """Set the default queue implementation.

This method specifies the default queue class, used as the backend implementation when `klass` is not provided.

Args:
    queue (Type): Default queue class.
"""
        cls.__default_queue__ = queue

    @property
    def sid(self):
        return f'{globals._sid}-{self._class}'

    def enqueue(self, message):
        """Add a message to the queue.

This method adds the specified message to the tail of the queue, following the First-In-First-Out (FIFO) principle.

Args:
    message: The message content to be added to the queue.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)
    def dequeue(self, limit=None):
        """Retrieve messages from the queue.

This method retrieves messages from the head of the queue and removes them, with the option to specify the number of messages to retrieve at once.

Args:
    limit (int, optional): Maximum number of messages to retrieve at once. If None, retrieves all messages. Defaults to None.

**Returns:**

- list: List of retrieved messages.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)
    def peek(self):
        """Retrieve the next message in the queue without removing it.

**Returns:**

- Any: The next available message in the queue, or ``None`` if the queue is empty.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)
    def size(self):
        """Get the number of messages in the queue.

**Returns:**

- int: The current number of messages in the queue.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)
    def init(self):
        """Initialize the queue.

This method clears all messages in the current queue, equivalent to calling ``clear()``.
"""
        self.clear()

    def clear(self):
        """Clear the queue.

Removes all messages from the queue, resetting it to an empty state.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

    @abstractmethod
    def _enqueue(self, id, message): pass

    @abstractmethod
    def _dequeue(self, id, limit=None): pass

    @abstractmethod
    def _peek(self, id): pass

    @abstractmethod
    def _size(self, id): pass

    @abstractmethod
    def _clear(self, id): pass

clear()

Clear the queue.

Removes all messages from the queue, resetting it to an empty state.

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='clear_test')
>>> for i in range(10):
...     queue.enqueue(f"Message{i}")
>>> queue.size()
10
>>> queue.clear()
>>> queue.size()
0
>>> queue.peek() is None
True
Source code in lazyllm/common/queue.py
    def clear(self):
        """Clear the queue.

Removes all messages from the queue, resetting it to an empty state.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

dequeue(limit=None)

Retrieve messages from the queue.

This method retrieves messages from the head of the queue and removes them, with the option to specify the number of messages to retrieve at once.

Parameters:

  • limit (int, default: None ) –

    Maximum number of messages to retrieve at once. If None, retrieves all messages. Defaults to None.

Returns:

  • list: List of retrieved messages.

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
>>> for i in range(5):
...     queue.enqueue(f"Message{i}")
>>> all_messages = queue.dequeue()
>>> all_messages
['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
Source code in lazyllm/common/queue.py
    def dequeue(self, limit=None):
        """Retrieve messages from the queue.

This method retrieves messages from the head of the queue and removes them, with the option to specify the number of messages to retrieve at once.

Args:
    limit (int, optional): Maximum number of messages to retrieve at once. If None, retrieves all messages. Defaults to None.

**Returns:**

- list: List of retrieved messages.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)

enqueue(message)

Add a message to the queue.

This method adds the specified message to the tail of the queue, following the First-In-First-Out (FIFO) principle.

Parameters:

  • message

    The message content to be added to the queue.

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
>>> queue.enqueue(123)
>>> queue.peek()
'123'
Source code in lazyllm/common/queue.py
    def enqueue(self, message):
        """Add a message to the queue.

This method adds the specified message to the tail of the queue, following the First-In-First-Out (FIFO) principle.

Args:
    message: The message content to be added to the queue.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)

get_instance(klass) classmethod

Get the queue instance for the specified class name.

This method returns the queue object bound to the given class name. If the class name has not been registered, it will be initialized automatically.

Parameters:

  • klass (str) –

    Queue class name identifier, must not be '__default__'.

Returns:

  • FileSystemQueue: Queue instance bound to the specified class name.
Source code in lazyllm/common/queue.py
    @classmethod
    def get_instance(cls, klass):
        """Get the queue instance for the specified class name.

This method returns the queue object bound to the given class name. If the class name has not been registered, it will be initialized automatically.

Args:
    klass (str): Queue class name identifier, must not be ``'__default__'``.

**Returns:**

- FileSystemQueue: Queue instance bound to the specified class name.
"""
        assert isinstance(klass, str) and klass != '__default__'
        return cls(klass=klass)

init()

Initialize the queue.

This method clears all messages in the current queue, equivalent to calling clear().

Source code in lazyllm/common/queue.py
    def init(self):
        """Initialize the queue.

This method clears all messages in the current queue, equivalent to calling ``clear()``.
"""
        self.clear()

peek()

Retrieve the next message in the queue without removing it.

Returns:

  • Any: The next available message in the queue, or None if the queue is empty.

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='peek_test')
>>> queue.enqueue("First message")
>>> queue.enqueue("Second message")
>>> first_message = queue.peek()
>>> first_message
'First message'
>>> queue.peek()
'First message'
Source code in lazyllm/common/queue.py
    def peek(self):
        """Retrieve the next message in the queue without removing it.

**Returns:**

- Any: The next available message in the queue, or ``None`` if the queue is empty.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)

set_default(queue) classmethod

Set the default queue implementation.

This method specifies the default queue class, used as the backend implementation when klass is not provided.

Parameters:

  • queue (Type) –

    Default queue class.

Source code in lazyllm/common/queue.py
    @classmethod
    def set_default(cls, queue: Type):
        """Set the default queue implementation.

This method specifies the default queue class, used as the backend implementation when `klass` is not provided.

Args:
    queue (Type): Default queue class.
"""
        cls.__default_queue__ = queue

size()

Get the number of messages in the queue.

Returns:

  • int: The current number of messages in the queue.

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='size_test')
>>> queue.size()
0
>>> queue.enqueue("Message1")
>>> queue.size()
1
>>> queue.enqueue("Message2")
>>> queue.size()
2
>>> queue.dequeue()
['Message1', 'Message2']
>>> queue.size()
0
Source code in lazyllm/common/queue.py
    def size(self):
        """Get the number of messages in the queue.

**Returns:**

- int: The current number of messages in the queue.


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)

lazyllm.common.multiprocessing.SpawnProcess

Bases: Process

Source code in lazyllm/common/multiprocessing.py
class SpawnProcess(multiprocessing.Process):
    def start(self):
        """
Start the process using spawn method.

This method forces the use of spawn method when starting the process, which creates a brand new Python interpreter process. Spawn is safer than fork, especially in multi-threaded environments.

**Notes:**
- Uses spawn method to start new process, avoiding potential issues with fork
- Temporarily switches to spawn method and restores original method after execution
- Inherits all functionality from multiprocessing.Process.start()


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

start()

Start the process using spawn method.

This method forces the use of spawn method when starting the process, which creates a brand new Python interpreter process. Spawn is safer than fork, especially in multi-threaded environments.

Notes: - Uses spawn method to start new process, avoiding potential issues with fork - Temporarily switches to spawn method and restores original method after execution - Inherits all functionality from multiprocessing.Process.start()

Examples:

```python
from lazyllm.common.multiprocessing import SpawnProcess

def worker():
    print("Worker process running")

# Create and start a process using spawn method
process = SpawnProcess(target=worker)
process.start()
process.join()
```
Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """
Start the process using spawn method.

This method forces the use of spawn method when starting the process, which creates a brand new Python interpreter process. Spawn is safer than fork, especially in multi-threaded environments.

**Notes:**
- Uses spawn method to start new process, avoiding potential issues with fork
- Temporarily switches to spawn method and restores original method after execution
- Inherits all functionality from multiprocessing.Process.start()


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

lazyllm.common.queue.SQLiteQueue

Bases: FileSystemQueue

Persistent file system queue backed by SQLite. This class extends FileSystemQueue and stores queue data in an SQLite database. Messages are ordered by a position field to preserve FIFO behavior. The class supports concurrent-safe operations including enqueue, dequeue, peek, size checking, and clearing the queue. The queue database is saved at ~/.lazyllm_filesystem_queue.db, with a file lock mechanism ensuring safe access in multi-process environments.

Parameters:

  • klass (str, default: '__default__' ) –

    Name of the queue category used to logically separate queues. Default is 'default'.

Source code in lazyllm/common/queue.py
class SQLiteQueue(FileSystemQueue):
    """Persistent file system queue backed by SQLite.
This class extends FileSystemQueue and stores queue data in an SQLite database. Messages are ordered by a position field to preserve FIFO behavior. The class supports concurrent-safe operations including enqueue, dequeue, peek, size checking, and clearing the queue.
The queue database is saved at ~/.lazyllm_filesystem_queue.db, with a file lock mechanism ensuring safe access in multi-process environments.

Args:
    klass (str): Name of the queue category used to logically separate queues. Default is '__default__'.
"""
    _init_lock = threading.Lock()

    def __init__(self, klass='__default__'):
        if getattr(self, '_initialized', False):
            return
        with self._init_lock:
            if getattr(self, '_initialized', False):
                return
            super(__class__, self).__init__(klass=klass)
            self.db_path = os.path.expanduser(os.path.join(config['home'], '.lazyllm_filesystem_queue.db'))
            lock_kwargs = {}
            if 'is_singleton' in inspect.signature(SoftFileLock).parameters:
                lock_kwargs['is_singleton'] = True
            self._lock = SoftFileLock(self.db_path + '.lock', **lock_kwargs)
            self._check_same_thread = not sqlite3_check_threadsafety()
            try:
                self._initialize_db()
            except Exception:
                with FileSystemQueue.__queue_pool_lock__:
                    if FileSystemQueue.__queue_pool__.get(klass) is self:
                        FileSystemQueue.__queue_pool__.pop(klass, None)
                raise
            self._initialized = True

    def _initialize_db(self):
        with self._lock, sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS queue (
                id TEXT NOT NULL,
                position INTEGER NOT NULL,
                message TEXT NOT NULL,
                PRIMARY KEY (id, position)
            )
            ''')
            conn.commit()

    def _enqueue(self, id, message):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT MAX(position) FROM queue WHERE id = ?
                ''', (id,))
                max_pos = cursor.fetchone()[0]
                next_pos = 0 if max_pos is None else max_pos + 1
                cursor.execute('''
                INSERT INTO queue (id, position, message)
                VALUES (?, ?, ?)
                ''', (id, next_pos, message))
                conn.commit()

    def _dequeue(self, id, limit=None):
        """Retrieve and remove all messages from the queue."""
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                if limit:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC LIMIT ?', (id, limit))
                else:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC', (id,))

                rows = cursor.fetchall()
                if not rows:
                    return []
                messages = [row[0] for row in rows]
                cursor.execute('DELETE FROM queue WHERE id = ? AND position IN '
                               f'({",".join([str(row[1]) for row in rows])})', (id, ))
                conn.commit()
                return messages

    def _peek(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT message FROM queue WHERE id = ? ORDER BY position ASC LIMIT 1
                ''', (id,))
                row = cursor.fetchone()
                if row is None:
                    return None
                return row[0]

    def _size(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT COUNT(*) FROM queue WHERE id = ?
                ''', (id,))
                return cursor.fetchone()[0]

    def _clear(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                DELETE FROM queue WHERE id = ?
                ''', (id,))
                conn.commit()

lazyllm.common.ReadOnlyWrapper

Bases: object

A lightweight read-only wrapper that holds an arbitrary object and exposes its attributes. It supports swapping the internal object dynamically and provides utility for checking emptiness. Note: it does not enforce deep immutability, but deepcopy drops the wrapped object.

Parameters:

  • obj (Optional[Any], default: None ) –

    The initial wrapped object, defaults to None.

Source code in lazyllm/common/common.py
class ReadOnlyWrapper(object):
    """
A lightweight read-only wrapper that holds an arbitrary object and exposes its attributes. It supports swapping the internal object dynamically and provides utility for checking emptiness. Note: it does not enforce deep immutability, but deepcopy drops the wrapped object.

Args:
    obj (Optional[Any]): The initial wrapped object, defaults to None.
"""
    def __init__(self, obj=None):
        self.obj = obj

    def set(self, obj):
        """
Replace the currently wrapped internal object.

Args:
    obj (Any): New object to wrap.
"""
        self.obj = obj

    def __getattr__(self, key):
        # key will be 'obj' in copy.deepcopy
        if key != 'obj' and self.obj is not None:
            return getattr(self.obj, key)
        return super(__class__, self).__getattr__(key)

    # TODO: modify it
    def __repr__(self):
        r = self.obj.__repr__()
        return (f'{r[:-1]}' if r.endswith('>') else f'<{r}') + '(Readonly)>'

    def __deepcopy__(self, memo):
        # drop obj
        return ReadOnlyWrapper()

    def isNone(self):
        """
Check whether the wrapper currently holds no object.

Args:
    None.

**Returns:**

- bool: True if the internal object is None, otherwise False.
"""
        return self.obj is None

isNone()

Check whether the wrapper currently holds no object.

Returns:

  • bool: True if the internal object is None, otherwise False.
Source code in lazyllm/common/common.py
    def isNone(self):
        """
Check whether the wrapper currently holds no object.

Args:
    None.

**Returns:**

- bool: True if the internal object is None, otherwise False.
"""
        return self.obj is None

set(obj)

Replace the currently wrapped internal object.

Parameters:

  • obj (Any) –

    New object to wrap.

Source code in lazyllm/common/common.py
    def set(self, obj):
        """
Replace the currently wrapped internal object.

Args:
    obj (Any): New object to wrap.
"""
        self.obj = obj

lazyllm.common.queue.RedisQueue

Bases: FileSystemQueue

Redis-backed file system queue (inherits from FileSystemQueue) for cross-process/node message passing and queue management. It initializes its underlying storage using a configured Redis URL and employs thread-safe setup logic.

Parameters:

  • klass (str, default: '__default__' ) –

    Classification name for the queue instance to distinguish different queues. Defaults to 'default'.

Source code in lazyllm/common/queue.py
class RedisQueue(FileSystemQueue):
    """
Redis-backed file system queue (inherits from FileSystemQueue) for cross-process/node message passing and queue management. It initializes its underlying storage using a configured Redis URL and employs thread-safe setup logic.

Args:
    klass (str): Classification name for the queue instance to distinguish different queues. Defaults to '__default__'.
"""
    def __init__(self, klass='__default__'):
        super(__class__, self).__init__(klass=klass)
        self.redis_url = config['fsqredis_url']
        self._lock = threading.Lock()
        self._initialize_db()

    def _initialize_db(self):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            assert (
                conn.ping()
            ), 'Found fsque reids config but can not connect, please check your config `LAZYLLM_FSQREDIS_URL`.'
            if not conn.exists(self.sid):
                conn.rpush(self.sid, '<start>')

    def _enqueue(self, id, message):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.rpush(id, message)

    def _dequeue(self, id, limit=None):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            if limit:
                limit = limit + 1
                vals = conn.lrange(id, 1, limit)
                conn.ltrim(id, limit, -1)
            else:
                vals = conn.lrange(id, 1, -1)
                conn.ltrim(id, 0, 0)
            if not vals:
                return []
            return [val.decode('utf-8') for val in vals]

    def _peek(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            val = conn.lindex(id, 1)
            if val is None:
                return None
            return val.decode('utf-8')

    def _size(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            rsize = conn.llen(id)
            return rsize - 1  # empty : [ <start> ]

    def _clear(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.delete(id)

Multiprocessing

lazyllm.common.ForkProcess

Bases: Process

Enhanced process class provided by LazyLLM, inheriting from Python's standard library multiprocessing.Process. This class specifically uses the fork start method to create child processes and provides support for synchronous/asynchronous execution modes.

Parameters:

  • group

    Process group, default to None

  • target

    Function to be executed in the process, default to None

  • name

    Process name, default to None

  • args

    Tuple of arguments to pass to the target function, default to ()

  • kwargs

    Dictionary of keyword arguments to pass to the target function, default to {}

  • daemon

    Whether the process is a daemon process, default to None

  • sync

    Whether to use synchronous mode, default to True. In synchronous mode, the process automatically exits after executing the target function; in asynchronous mode, the process continues running until manually terminated.

Note: This class is primarily used for LazyLLM's internal process management, especially in long-running server processes.

Examples:

>>> import lazyllm
>>> from lazyllm.common import ForkProcess
>>> import time
>>> import os
>>> def simple_task(task_id):
...     print(f"Process {os.getpid()} executing task {task_id}")
...     time.sleep(0.1)  
...     return f"Task {task_id} completed by process {os.getpid()}"
>>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
>>> process.start()
Process 12345 executing task 1
Source code in lazyllm/common/multiprocessing.py
class ForkProcess(multiprocessing.Process):
    """
Enhanced process class provided by LazyLLM, inheriting from Python's standard library `multiprocessing.Process`. This class specifically uses the fork start method to create child processes and provides support for synchronous/asynchronous execution modes.

Args:
    group: Process group, default to ``None``
    target: Function to be executed in the process, default to ``None``
    name: Process name, default to ``None``
    args: Tuple of arguments to pass to the target function, default to ``()``
    kwargs: Dictionary of keyword arguments to pass to the target function, default to ``{}``
    daemon: Whether the process is a daemon process, default to ``None``
    sync: Whether to use synchronous mode, default to ``True``. In synchronous mode, the process automatically exits after executing the target function; in asynchronous mode, the process continues running until manually terminated.

**Note**: This class is primarily used for LazyLLM's internal process management, especially in long-running server processes.


Examples:

    >>> import lazyllm
    >>> from lazyllm.common import ForkProcess
    >>> import time
    >>> import os
    >>> def simple_task(task_id):
    ...     print(f"Process {os.getpid()} executing task {task_id}")
    ...     time.sleep(0.1)  
    ...     return f"Task {task_id} completed by process {os.getpid()}"
    >>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
    >>> process.start()
    Process 12345 executing task 1
    """
    def __init__(self, group=None, target=None, name=None, args=(),
                 kwargs=None, *, daemon=None, sync=True):
        super().__init__(group, ForkProcess.work(target, sync), name, args, kwargs or {}, daemon=daemon)

    @staticmethod
    def work(f, sync):
        """
Core working method of ForkProcess, responsible for wrapping the target function and handling synchronous/asynchronous execution logic.

Args:
    f: Target function to execute
    sync: Whether to use synchronous mode. In synchronous mode, the process exits after executing the target function; in asynchronous mode, the process continues running.
"""
        def impl(*args, **kw):
            try:
                f(*args, **kw)
                if not sync:
                    while True: time.sleep(1)
            finally:
                atexit._run_exitfuncs()
        return impl

    def start(self):
        """
Start the ForkProcess. This method uses the fork start method to create a child process and begin executing the target function.

Features of this method:

- **Fork Start**: Uses fork method to create child processes, providing better performance on Unix/Linux systems
- **Context Management**: Automatically manages the context of process start methods, ensuring the correct start method is used
- **Parent Inheritance**: Inherits all functionality from `multiprocessing.Process.start()`

**Note**: This method actually creates a new process and begins execution, the process starts running immediately after calling.

"""
        with _ctx('fork'):
            return super().start()

start()

Start the ForkProcess. This method uses the fork start method to create a child process and begin executing the target function.

Features of this method:

  • Fork Start: Uses fork method to create child processes, providing better performance on Unix/Linux systems
  • Context Management: Automatically manages the context of process start methods, ensuring the correct start method is used
  • Parent Inheritance: Inherits all functionality from multiprocessing.Process.start()

Note: This method actually creates a new process and begins execution, the process starts running immediately after calling.

Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """
Start the ForkProcess. This method uses the fork start method to create a child process and begin executing the target function.

Features of this method:

- **Fork Start**: Uses fork method to create child processes, providing better performance on Unix/Linux systems
- **Context Management**: Automatically manages the context of process start methods, ensuring the correct start method is used
- **Parent Inheritance**: Inherits all functionality from `multiprocessing.Process.start()`

**Note**: This method actually creates a new process and begins execution, the process starts running immediately after calling.

"""
        with _ctx('fork'):
            return super().start()

work(f, sync) staticmethod

Core working method of ForkProcess, responsible for wrapping the target function and handling synchronous/asynchronous execution logic.

Parameters:

  • f

    Target function to execute

  • sync

    Whether to use synchronous mode. In synchronous mode, the process exits after executing the target function; in asynchronous mode, the process continues running.

Source code in lazyllm/common/multiprocessing.py
    @staticmethod
    def work(f, sync):
        """
Core working method of ForkProcess, responsible for wrapping the target function and handling synchronous/asynchronous execution logic.

Args:
    f: Target function to execute
    sync: Whether to use synchronous mode. In synchronous mode, the process exits after executing the target function; in asynchronous mode, the process continues running.
"""
        def impl(*args, **kw):
            try:
                f(*args, **kw)
                if not sync:
                    while True: time.sleep(1)
            finally:
                atexit._run_exitfuncs()
        return impl

Options

lazyllm.common.Option

Bases: object

Option management class provided by LazyLLM, used for managing multiple option values and iterating between them. This class is primarily used for parameter grid search and hyperparameter tuning scenarios.

Parameters:

  • *obj

    One or more option values, which can be objects of any type. If a single list or tuple is passed, it will be automatically expanded. At least two options must be provided.

Key features:

  • Multi-option Management: Can manage multiple different option values.
  • Iteration Support: Supports standard Python iteration protocol, allowing traversal of all options.
  • Current Value Access: Provides access to the currently selected option.
  • Deep Copy: Supports obtaining a deep copy of the currently selected option.
  • Cyclic Iteration: Options can be iterated over in a cyclic manner.

Note: This class is mainly used for LazyLLM's internal parameter search and trial management, especially in TrialModule for parameter grid search.

Examples:

>>> import lazyllm
>>> from lazyllm.common.option import Option
>>> learning_rates = Option(0.001, 0.01, 0.1)
>>> print(f"当前学习率: {learning_rates}")
当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
>>> print(f"所有选项: {list(learning_rates)}")
所有选项: [0.001, 0.01, 0.1]
Source code in lazyllm/common/option.py
class Option(object):
    """
Option management class provided by LazyLLM, used for managing multiple option values and iterating between them. This class is primarily used for parameter grid search and hyperparameter tuning scenarios.

Args:
    *obj: One or more option values, which can be objects of any type. If a single list or tuple is passed, it will be automatically expanded. At least two options must be provided.

Key features:

- **Multi-option Management**: Can manage multiple different option values.
- **Iteration Support**: Supports standard Python iteration protocol, allowing traversal of all options.
- **Current Value Access**: Provides access to the currently selected option.
- **Deep Copy**: Supports obtaining a deep copy of the currently selected option.
- **Cyclic Iteration**: Options can be iterated over in a cyclic manner.

**Note**: This class is mainly used for LazyLLM's internal parameter search and trial management, especially in `TrialModule` for parameter grid search.


Examples:
    >>> import lazyllm
    >>> from lazyllm.common.option import Option
    >>> learning_rates = Option(0.001, 0.01, 0.1)
    >>> print(f"当前学习率: {learning_rates}")
    当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
    >>> print(f"所有选项: {list(learning_rates)}")
    所有选项: [0.001, 0.01, 0.1]
    """
    def __init__(self, *obj):
        if len(obj) == 1 and isinstance(obj[0], (tuple, list)): obj = obj[0]
        assert isinstance(obj, (tuple, list)) and len(obj) > 1, 'More than one option shoule be given'
        self._objs = obj
        self._idx = 0
        self._obj = self._objs[self._idx]

    def _next(self):
        self._idx += 1
        if self._idx == len(self._objs):
            self._idx = 0
            raise StopIteration

    def __setattr__(self, __name: str, __value: Any) -> None:
        object.__setattr__(self, __name, __value)
        if __name == '_idx' and 0 <= self._idx < len(self._objs):
            self._obj = self._objs[self._idx]

    def __deepcopy__(self, *args, **kw):
        return copy.deepcopy(self._obj)

    def __iter__(self):
        return _OptionIterator(self)

    def __repr__(self):
        return f'<Option options="{self._objs}" curr="{self._obj}">'

DynamicDescriptor

lazyllm.common.DynamicDescriptor

Dynamic descriptor class for creating descriptors that support both instance and class level calls.

Parameters:

  • func (callable) –

    Function or method to be wrapped

Source code in lazyllm/common/common.py
class DynamicDescriptor:
    """Dynamic descriptor class for creating descriptors that support both instance and class level calls.

Args:
    func (callable): Function or method to be wrapped
"""
    class Impl:
        def __init__(self, func, instance, owner):
            self._func, self._instance, self._owner = func, instance, owner

        def __call__(self, *args, **kw):
            return self._func(self._instance, *args, **kw) if self._instance else self._func(self._owner, *args, **kw)

        def __repr__(self): return repr(self._func)
        __doc__ = property(lambda self: self._func.__doc__)

        @__doc__.setter
        def __doc__(self, value): self._func.__doc__ = value

    def __init__(self, func):
        self.__func__ = func

    def __get__(self, instance, owner):
        return DynamicDescriptor.Impl(self.__func__, instance, owner)

lazyllm.common.CaseInsensitiveDict

Bases: dict

Case-insensitive dictionary class.

CaseInsensitiveDict inherits from dict and provides case-insensitive key-value storage and retrieval. All keys are converted to lowercase when stored, ensuring that values can be accessed regardless of whether the key name is uppercase, lowercase, or mixed case.

Features
  • All keys are automatically converted to lowercase when stored
  • Supports standard dictionary operations (get, set, check containment)
  • Maintains all original dict functionality, only differs in key name handling

Parameters:

  • *args

    Positional arguments passed to the parent dict class

  • **kwargs

    Keyword arguments passed to the parent dict class

Examples:

>>> from lazyllm.common import CaseInsensitiveDict
>>> # 创建大小写不敏感的字典
>>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
>>> 
>>> # 使用不同大小写访问相同的键
>>> print(d['name'])      # 使用小写
... 'John'
>>> print(d['NAME'])      # 使用大写
... 'John'
>>> print(d['Name'])      # 使用首字母大写
... 'John'
>>> 
>>> # 设置值时也会转换为小写
>>> d['EMAIL'] = 'john@example.com'
>>> print(d['email'])     # 使用小写访问
... 'john@example.com'
>>> 
>>> # 检查键是否存在(大小写不敏感)
>>> 'AGE' in d
True
>>> 'age' in d
True
>>> 'Age' in d
True
>>> 
>>> # 支持标准字典操作
>>> d['PHONE'] = '123-456-7890'
>>> print(d.get('phone'))
... '123-456-7890'
>>> print(len(d))
... 5
Source code in lazyllm/common/common.py
class CaseInsensitiveDict(dict):
    """Case-insensitive dictionary class.

CaseInsensitiveDict inherits from dict and provides case-insensitive key-value storage and retrieval. All keys are converted to lowercase when stored, ensuring that values can be accessed regardless of whether the key name is uppercase, lowercase, or mixed case.

Features:
    - All keys are automatically converted to lowercase when stored
    - Supports standard dictionary operations (get, set, check containment)
    - Maintains all original dict functionality, only differs in key name handling

Args:
    *args: Positional arguments passed to the parent dict class
    **kwargs: Keyword arguments passed to the parent dict class


Examples:
    >>> from lazyllm.common import CaseInsensitiveDict
    >>> # 创建大小写不敏感的字典
    >>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
    >>> 
    >>> # 使用不同大小写访问相同的键
    >>> print(d['name'])      # 使用小写
    ... 'John'
    >>> print(d['NAME'])      # 使用大写
    ... 'John'
    >>> print(d['Name'])      # 使用首字母大写
    ... 'John'
    >>> 
    >>> # 设置值时也会转换为小写
    >>> d['EMAIL'] = 'john@example.com'
    >>> print(d['email'])     # 使用小写访问
    ... 'john@example.com'
    >>> 
    >>> # 检查键是否存在(大小写不敏感)
    >>> 'AGE' in d
    True
    >>> 'age' in d
    True
    >>> 'Age' in d
    True
    >>> 
    >>> # 支持标准字典操作
    >>> d['PHONE'] = '123-456-7890'
    >>> print(d.get('phone'))
    ... '123-456-7890'
    >>> print(len(d))
    ... 5
    """
    def __init__(self, *args, **kwargs):
        super().__init__()
        for key, value in dict(*args, **kwargs).items():
            assert isinstance(key, str)
            self[key] = value

    def __getitem__(self, key):
        assert isinstance(key, str)
        return super().__getitem__(key.lower())

    def __setitem__(self, key, value):
        assert isinstance(key, str)
        super().__setitem__(key.lower(), value)

    def __contains__(self, key):
        assert isinstance(key, str)
        return super().__contains__(key.lower())

lazyllm.common.ProcessPoolExecutor

Bases: ProcessPoolExecutor

Source code in lazyllm/common/multiprocessing.py
class ProcessPoolExecutor(PPE):
    def submit(self, fn, /, *args, **kwargs):
        """
Submit a task to the process pool for execution.

This method serializes a function and its arguments, then submits them to the process pool for execution. It returns a `Future` object to track the task's status or result.

Args:
    fn (Callable): The function to execute.
    *args: Positional arguments passed to the function.
    **kwargs: Keyword arguments passed to the function.

**Returns:**

- concurrent.futures.Future: A `Future` object representing the task's execution status.


Examples:

    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

submit(fn, /, *args, **kwargs)

Submit a task to the process pool for execution.

This method serializes a function and its arguments, then submits them to the process pool for execution. It returns a Future object to track the task's status or result.

Parameters:

  • fn (Callable) –

    The function to execute.

  • *args

    Positional arguments passed to the function.

  • **kwargs

    Keyword arguments passed to the function.

Returns:

  • concurrent.futures.Future: A Future object representing the task's execution status.

Examples:

>>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
>>> import time
>>> 
>>> def task(x):
...     time.sleep(1)
...     return x * 2
... 
>>> with ProcessPoolExecutor(max_workers=2) as executor:
...     future = executor.submit(task, 5)
...     result = future.result()
...     print(result)
10
Source code in lazyllm/common/multiprocessing.py
    def submit(self, fn, /, *args, **kwargs):
        """
Submit a task to the process pool for execution.

This method serializes a function and its arguments, then submits them to the process pool for execution. It returns a `Future` object to track the task's status or result.

Args:
    fn (Callable): The function to execute.
    *args: Positional arguments passed to the function.
    **kwargs: Keyword arguments passed to the function.

**Returns:**

- concurrent.futures.Future: A `Future` object representing the task's execution status.


Examples:

    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

lazyllm.common.ArgsDict

Bases: dict

Parameter dictionary class for managing and validating command line arguments.

Parameters:

  • *args

    Positional arguments passed to parent dict class

  • **kwargs

    Keyword arguments passed to parent dict class

Returns:

  • ArgsDict instance providing parameter checking and formatting functionality
Source code in lazyllm/common/common.py
class ArgsDict(dict):
    """Parameter dictionary class for managing and validating command line arguments.

Args:
    *args: Positional arguments passed to parent dict class
    **kwargs: Keyword arguments passed to parent dict class

**Returns:**

- ArgsDict instance providing parameter checking and formatting functionality
"""
    def __init__(self, *args, with_line=True, **kwargs):
        super(ArgsDict, self).__init__(*args, **kwargs)
        self._with_line = with_line

    def check_and_update(self, kw):
        """Check and update parameter dictionary.

Args:
    kw (dict): Parameter dictionary to update
"""
        if not kw.pop('skip_check', config['deploy_skip_check_kw']):
            assert set(kw.keys()).issubset(set(self)), f'unexpected keys: {set(kw.keys()) - set(self)}'
        self.update(kw)

    def parse_kwargs(self):
        """Parse parameter dictionary into command line argument string.
"""
        string = []
        for k, v in self.items():
            if type(v) is dict:
                v = json.dumps(v).replace('\"', '\\\"')
            if self._with_line:
                string.append(f'--{k}={v}' if type(v) is not str else f'--{k}=\"{v}\"')
            else:
                string.append(f'{k}={v}' if type(v) is not str else f'{k}=\"{v}\"')
        string = ' '.join(string)
        return string

check_and_update(kw)

Check and update parameter dictionary.

Parameters:

  • kw (dict) –

    Parameter dictionary to update

Source code in lazyllm/common/common.py
    def check_and_update(self, kw):
        """Check and update parameter dictionary.

Args:
    kw (dict): Parameter dictionary to update
"""
        if not kw.pop('skip_check', config['deploy_skip_check_kw']):
            assert set(kw.keys()).issubset(set(self)), f'unexpected keys: {set(kw.keys()) - set(self)}'
        self.update(kw)

parse_kwargs()

Parse parameter dictionary into command line argument string.

Source code in lazyllm/common/common.py
    def parse_kwargs(self):
        """Parse parameter dictionary into command line argument string.
"""
        string = []
        for k, v in self.items():
            if type(v) is dict:
                v = json.dumps(v).replace('\"', '\\\"')
            if self._with_line:
                string.append(f'--{k}={v}' if type(v) is not str else f'--{k}=\"{v}\"')
            else:
                string.append(f'{k}={v}' if type(v) is not str else f'{k}=\"{v}\"')
        string = ' '.join(string)
        return string

Threading

lazyllm.common.Thread

Bases: Thread

Enhanced thread class provided by LazyLLM, inheriting from Python's standard library threading.Thread. This class provides additional functionality including session ID management, pre-hook function support, and exception handling mechanisms.

Parameters:

  • group

    Thread group, default to None

  • target

    Function to be executed in the thread, default to None

  • name

    Thread name, default to None

  • args

    Tuple of arguments to pass to the target function, default to ()

  • kwargs

    Dictionary of keyword arguments to pass to the target function, default to None

  • prehook

    Function or list of functions to call before thread execution, default to None

  • daemon

    Whether the thread is a daemon thread, default to None

Examples:

>>> import lazyllm
>>> from lazyllm.common.threading import Thread
>>> import time
>>> def simple_task(name):
...     time.sleep(0.1)
...     return f"Hello from {name}"
>>> thread = Thread(target=simple_task, args=("Worker",))
>>> thread.start()
>>> result = thread.get_result()
>>> print(result)
Hello from Worker
>>> def setup_environment():
...     print("Setting up environment...")
...     return "environment_ready"
>>> def validate_input(data):
...     print(f"Validating input: {data}")
...     if not isinstance(data, (int, float)):
...         raise ValueError("Input must be numeric")
>>> def process_data(data):
...     print(f"Processing data: {data}")
...     time.sleep(0.1) 
...     return data * 2
>>> thread = Thread(
...     target=process_data,
...     args=(42,),
...     prehook=[setup_environment, lambda: validate_input(42)]
... )
>>> thread.start()
Setting up environment...
Validating input: 42
Processing data: 42
>>> result = thread.get_result()
>>> print(f"Final result: {result}")
Final result: 84
Source code in lazyllm/common/threading.py
class Thread(threading.Thread):
    """Enhanced thread class provided by LazyLLM, inheriting from Python's standard library `threading.Thread`. This class provides additional functionality including session ID management, pre-hook function support, and exception handling mechanisms.

Args:
    group: Thread group, default to ``None``
    target: Function to be executed in the thread, default to ``None``
    name: Thread name, default to ``None``
    args: Tuple of arguments to pass to the target function, default to ``()``
    kwargs: Dictionary of keyword arguments to pass to the target function, default to ``None``
    prehook: Function or list of functions to call before thread execution, default to ``None``
    daemon: Whether the thread is a daemon thread, default to ``None``


Examples:
    >>> import lazyllm
    >>> from lazyllm.common.threading import Thread
    >>> import time
    >>> def simple_task(name):
    ...     time.sleep(0.1)
    ...     return f"Hello from {name}"
    >>> thread = Thread(target=simple_task, args=("Worker",))
    >>> thread.start()
    >>> result = thread.get_result()
    >>> print(result)
    Hello from Worker
    >>> def setup_environment():
    ...     print("Setting up environment...")
    ...     return "environment_ready"
    >>> def validate_input(data):
    ...     print(f"Validating input: {data}")
    ...     if not isinstance(data, (int, float)):
    ...         raise ValueError("Input must be numeric")
    >>> def process_data(data):
    ...     print(f"Processing data: {data}")
    ...     time.sleep(0.1) 
    ...     return data * 2
    >>> thread = Thread(
    ...     target=process_data,
    ...     args=(42,),
    ...     prehook=[setup_environment, lambda: validate_input(42)]
    ... )
    >>> thread.start()
    Setting up environment...
    Validating input: 42
    Processing data: 42
    >>> result = thread.get_result()
    >>> print(f"Final result: {result}")
    Final result: 84
    """
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, prehook=None, daemon=None):
        self.q = Queue()
        if not isinstance(prehook, (tuple, list)): prehook = [prehook] if prehook else []
        prehook.insert(0, functools.partial(_sid_setter, sid=globals._sid))
        super().__init__(group, self.work, name, (prehook, target, args), kwargs, daemon=daemon)

    def work(self, prehook, target, args, **kw):
        """Core working method of the thread, responsible for executing pre-hook functions, target function, and handling exceptions and results.

Args:
    prehook: List of pre-hook functions to call before thread execution
    target: Target function to execute
    args: Arguments to pass to the target function
    **kw: Keyword arguments to pass to the target function

**Note**: This method is called internally by the `Thread` class, users typically don't need to call this method directly.
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

    def get_result(self):
        """Method to retrieve the thread execution result. This method blocks until the thread execution is complete, then returns the execution result or re-raises the exception.

**Returns:**

- The result of thread execution. If the target function executes normally, returns its return value; if an exception occurs, re-raises that exception.

**Note**: This method should be used after calling `thread.start()` to retrieve the thread execution result.
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

get_result()

Method to retrieve the thread execution result. This method blocks until the thread execution is complete, then returns the execution result or re-raises the exception.

Returns:

  • The result of thread execution. If the target function executes normally, returns its return value; if an exception occurs, re-raises that exception.

Note: This method should be used after calling thread.start() to retrieve the thread execution result.

Source code in lazyllm/common/threading.py
    def get_result(self):
        """Method to retrieve the thread execution result. This method blocks until the thread execution is complete, then returns the execution result or re-raises the exception.

**Returns:**

- The result of thread execution. If the target function executes normally, returns its return value; if an exception occurs, re-raises that exception.

**Note**: This method should be used after calling `thread.start()` to retrieve the thread execution result.
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

work(prehook, target, args, **kw)

Core working method of the thread, responsible for executing pre-hook functions, target function, and handling exceptions and results.

Parameters:

  • prehook

    List of pre-hook functions to call before thread execution

  • target

    Target function to execute

  • args

    Arguments to pass to the target function

  • **kw

    Keyword arguments to pass to the target function

Note: This method is called internally by the Thread class, users typically don't need to call this method directly.

Source code in lazyllm/common/threading.py
    def work(self, prehook, target, args, **kw):
        """Core working method of the thread, responsible for executing pre-hook functions, target function, and handling exceptions and results.

Args:
    prehook: List of pre-hook functions to call before thread execution
    target: Target function to execute
    args: Arguments to pass to the target function
    **kw: Keyword arguments to pass to the target function

**Note**: This method is called internally by the `Thread` class, users typically don't need to call this method directly.
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

LazyLLMCMD

lazyllm.common.LazyLLMCMD

Bases: object

Command line operation wrapper class providing secure and flexible command management.

Parameters:

  • cmd (Union[str, List[str], Callable]) –

    Command input, supports three formats:String command,Command list,Callable object.

  • return_value (Any, default: None ) –

    Preset return value.

  • checkf (Any, default: lambda *a: True ) –

    Command validation function with signature.

  • no_displays (Any, default: None ) –

    Sensitive parameter names to filter.

Examples:

>>> from lazyllm.common import LazyLLMCMD
>>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
>>> print(cmd.get_args("epochs"))
50
>>> print(cmd.get_args("batch-size")) 
32
>>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
>>> new = base.with_cmd("python predict.py")
Source code in lazyllm/common/common.py
class LazyLLMCMD(object):
    """Command line operation wrapper class providing secure and flexible command management.

Args:
    cmd (Union[str, List[str], Callable]):Command input, supports three formats:String command,Command list,Callable object.
    return_value (Any):Preset return value.
    checkf(Any):Command validation function with signature.
    no_displays(Any):Sensitive parameter names to filter.



Examples:
    >>> from lazyllm.common import LazyLLMCMD
    >>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
    >>> print(cmd.get_args("epochs"))
    50
    >>> print(cmd.get_args("batch-size")) 
    32
    >>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
    >>> new = base.with_cmd("python predict.py")

    """
    def __init__(self, cmd, *, return_value=None, checkf=(lambda *a: True), no_displays=None):
        if isinstance(cmd, (tuple, list)):
            cmd = ' && '.join(cmd)
        assert isinstance(cmd, str) or callable(cmd), 'cmd must be func or (list of) bash command str.'
        self.cmd = cmd
        self.return_value = return_value
        self.checkf = checkf
        self.no_displays = no_displays

    def __hash__(self):
        return hash(self.cmd)

    def __str__(self):
        assert not callable(self.cmd), f'Cannot convert cmd function {self.cmd} to str'
        cmd = re.sub(r'\b(LAZYLLM_[A-Z0-9_]*?_(?:API|SECRET)_KEY)=\S+', r'\1=xxxxxx', self.cmd)
        if self.no_displays:
            for item in self.no_displays:
                pattern = r'(-{1,2}' + re.escape(item) + r')(\s|=|)(\S+|)'
                cmd = re.sub(pattern, '', cmd)
            return cmd
        else:
            return cmd

    def with_cmd(self, cmd):
        """Create new command object inheriting current configuration.

Args:
    cmd: New command content (must be same type as original)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance

    def get_args(self, key):
        """Extracts specified argument value from command string.

Args:
    key: Argument name
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

get_args(key)

Extracts specified argument value from command string.

Parameters:

  • key

    Argument name

Source code in lazyllm/common/common.py
    def get_args(self, key):
        """Extracts specified argument value from command string.

Args:
    key: Argument name
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

with_cmd(cmd)

Create new command object inheriting current configuration.

Parameters:

  • cmd

    New command content (must be same type as original)

Source code in lazyllm/common/common.py
    def with_cmd(self, cmd):
        """Create new command object inheriting current configuration.

Args:
    cmd: New command content (must be same type as original)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance

lazyllm.common.utils.SecurityVisitor

Bases: NodeVisitor

AST-based security analyzer to detect unsafe operations in Python code.

IMPORTANT: Method names within this class (e.g., visit_Call, visit_Import) should not be renamed to lowercase. These method names are part of the NodeVisitor pattern from the ast module and must remain consistant with this naming convention to function correctly.

Source code in lazyllm/common/utils.py
class SecurityVisitor(ast.NodeVisitor):  # noqa C901
    """
    AST-based security analyzer to detect unsafe operations in Python code.

    IMPORTANT: Method names within this class (e.g., `visit_Call`, `visit_Import`) **should not**
    be renamed to lowercase. These method names are part of the `NodeVisitor` pattern from the `ast`
    module and must remain consistant with this naming convention to function correctly.
    """

    # **Dangerous built-in functions**
    DANGEROUS_BUILTINS = {'exec', 'eval', 'open', 'compile', 'getattr',
                          'setattr', '__import__', 'globals', 'locals', 'vars'}

    # **Dangerous os operations**
    DANGEROUS_OS_CALLS = {'system', 'popen', 'remove', 'rmdir', 'unlink', 'rename'}

    # **Dangerous sys operations**
    DANGEROUS_SYS_CALLS = {'exit', 'modules'}

    # **Dangerous modules**
    DANGEROUS_MODULES = {'pickle', 'subprocess', 'socket', 'shutil', 'requests', 'inspect', 'tempfile'}

    def visit_Call(self, node):  # noqa C901
        """Check function calls"""
        # Direct calls to dangerous built-in functions
        if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_BUILTINS:
            raise ValueError(f'⚠️ Detected dangerous function call: {node.func.id}')

        # Check for __import__ calls with string arguments
        if isinstance(node.func, ast.Name) and node.func.id == '__import__':
            if node.args and isinstance(node.args[0], ast.Str):
                module_name = node.args[0].s
                if module_name in self.DANGEROUS_MODULES:
                    raise ValueError(f'⚠️ Detected dangerous module import via __import__: {module_name}')

        # Check for indirect __import__ calls (function calls that might return __import__)
        if isinstance(node.func, ast.Call):
            # Check if this is a call to a function that might return __import__
            if isinstance(node.func.func, ast.Name):
                func_name = node.func.func.id
                if func_name in ['get_import', 'import_func']:  # Common patterns
                    raise ValueError(f'⚠️ Detected suspicious function call that might return __import__: {func_name}')

        # Check for attribute access that might lead to __import__
        if isinstance(node.func, ast.Attribute):
            if isinstance(node.func.value, ast.Name) and node.func.value.id == 'ImportHelper':
                if node.func.attr == 'get_import':
                    raise ValueError('⚠️ Detected suspicious method call: ImportHelper.get_import')

        # os / sys related calls
        if isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Name):
            if node.func.value.id == 'os' and node.func.attr in self.DANGEROUS_OS_CALLS:
                raise ValueError(f'⚠️ Detected dangerous os call: os.{node.func.attr}')
            if node.func.value.id == 'sys' and node.func.attr in self.DANGEROUS_SYS_CALLS:
                raise ValueError(f'⚠️ Detected dangerous sys call: sys.{node.func.attr}')

        self.generic_visit(node)

    def visit_Import(self, node):
        """Check import statements"""
        for alias in node.names:
            if alias.name in self.DANGEROUS_MODULES:
                raise ValueError(f'⚠️ Detected dangerous module import: {alias.name}')

    def visit_ImportFrom(self, node):
        """Check from ... import statements"""
        if node.module in self.DANGEROUS_MODULES:
            raise ValueError(f'⚠️ Detected dangerous module import: {node.module}')

    def visit_Attribute(self, node):
        """Check os.environ and tempfile usage"""
        if isinstance(node.value, ast.Name):
            if node.value.id == 'os' and node.attr == 'environ':
                raise ValueError('⚠️ Detected dangerous access: os.environ')
            if node.value.id == 'tempfile':
                raise ValueError(f'⚠️ Detected dangerous usage of tempfile: tempfile.{node.attr}')

        self.generic_visit(node)

    def visit_Lambda(self, node):
        """Check lambda functions that might return __import__"""
        # Check if lambda body returns __import__
        if isinstance(node.body, ast.Name) and node.body.id == '__import__':
            raise ValueError('⚠️ Detected lambda function returning __import__')
        self.generic_visit(node)

    def visit_ListComp(self, node):
        """Check list comprehensions that might contain __import__"""
        # Check if the expression in list comprehension is __import__
        if isinstance(node.elt, ast.Name) and node.elt.id == '__import__':
            raise ValueError('⚠️ Detected list comprehension containing __import__')
        self.generic_visit(node)

    def visit_FunctionDef(self, node):
        """Check function definitions that might return __import__"""
        # Check if function returns __import__
        for stmt in node.body:
            if isinstance(stmt, ast.Return):
                if isinstance(stmt.value, ast.Name) and stmt.value.id == '__import__':
                    raise ValueError(f'⚠️ Detected function {node.name} returning __import__')
        self.generic_visit(node)

visit_Attribute(node)

Check os.environ and tempfile usage

Source code in lazyllm/common/utils.py
def visit_Attribute(self, node):
    """Check os.environ and tempfile usage"""
    if isinstance(node.value, ast.Name):
        if node.value.id == 'os' and node.attr == 'environ':
            raise ValueError('⚠️ Detected dangerous access: os.environ')
        if node.value.id == 'tempfile':
            raise ValueError(f'⚠️ Detected dangerous usage of tempfile: tempfile.{node.attr}')

    self.generic_visit(node)

visit_Call(node)

Check function calls

Source code in lazyllm/common/utils.py
def visit_Call(self, node):  # noqa C901
    """Check function calls"""
    # Direct calls to dangerous built-in functions
    if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_BUILTINS:
        raise ValueError(f'⚠️ Detected dangerous function call: {node.func.id}')

    # Check for __import__ calls with string arguments
    if isinstance(node.func, ast.Name) and node.func.id == '__import__':
        if node.args and isinstance(node.args[0], ast.Str):
            module_name = node.args[0].s
            if module_name in self.DANGEROUS_MODULES:
                raise ValueError(f'⚠️ Detected dangerous module import via __import__: {module_name}')

    # Check for indirect __import__ calls (function calls that might return __import__)
    if isinstance(node.func, ast.Call):
        # Check if this is a call to a function that might return __import__
        if isinstance(node.func.func, ast.Name):
            func_name = node.func.func.id
            if func_name in ['get_import', 'import_func']:  # Common patterns
                raise ValueError(f'⚠️ Detected suspicious function call that might return __import__: {func_name}')

    # Check for attribute access that might lead to __import__
    if isinstance(node.func, ast.Attribute):
        if isinstance(node.func.value, ast.Name) and node.func.value.id == 'ImportHelper':
            if node.func.attr == 'get_import':
                raise ValueError('⚠️ Detected suspicious method call: ImportHelper.get_import')

    # os / sys related calls
    if isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Name):
        if node.func.value.id == 'os' and node.func.attr in self.DANGEROUS_OS_CALLS:
            raise ValueError(f'⚠️ Detected dangerous os call: os.{node.func.attr}')
        if node.func.value.id == 'sys' and node.func.attr in self.DANGEROUS_SYS_CALLS:
            raise ValueError(f'⚠️ Detected dangerous sys call: sys.{node.func.attr}')

    self.generic_visit(node)

visit_FunctionDef(node)

Check function definitions that might return import

Source code in lazyllm/common/utils.py
def visit_FunctionDef(self, node):
    """Check function definitions that might return __import__"""
    # Check if function returns __import__
    for stmt in node.body:
        if isinstance(stmt, ast.Return):
            if isinstance(stmt.value, ast.Name) and stmt.value.id == '__import__':
                raise ValueError(f'⚠️ Detected function {node.name} returning __import__')
    self.generic_visit(node)

visit_Import(node)

Check import statements

Source code in lazyllm/common/utils.py
def visit_Import(self, node):
    """Check import statements"""
    for alias in node.names:
        if alias.name in self.DANGEROUS_MODULES:
            raise ValueError(f'⚠️ Detected dangerous module import: {alias.name}')

visit_ImportFrom(node)

Check from ... import statements

Source code in lazyllm/common/utils.py
def visit_ImportFrom(self, node):
    """Check from ... import statements"""
    if node.module in self.DANGEROUS_MODULES:
        raise ValueError(f'⚠️ Detected dangerous module import: {node.module}')

visit_Lambda(node)

Check lambda functions that might return import

Source code in lazyllm/common/utils.py
def visit_Lambda(self, node):
    """Check lambda functions that might return __import__"""
    # Check if lambda body returns __import__
    if isinstance(node.body, ast.Name) and node.body.id == '__import__':
        raise ValueError('⚠️ Detected lambda function returning __import__')
    self.generic_visit(node)

visit_ListComp(node)

Check list comprehensions that might contain import

Source code in lazyllm/common/utils.py
def visit_ListComp(self, node):
    """Check list comprehensions that might contain __import__"""
    # Check if the expression in list comprehension is __import__
    if isinstance(node.elt, ast.Name) and node.elt.id == '__import__':
        raise ValueError('⚠️ Detected list comprehension containing __import__')
    self.generic_visit(node)

lazyllm.common.common.Finalizer

Bases: object

Finalizer class for managing resource cleanup and release operations. Can be used as a context manager or trigger cleanup automatically when object is destroyed.

Parameters:

  • func1 (Callable) –

    Primary cleanup function. If func2 is provided, func1 is executed immediately and func2 becomes the cleanup function.

  • func2 (Optional[Callable], default: None ) –

    Optional cleanup function, defaults to None.

  • condition (Callable, default: lambda: True ) –

    Condition function, cleanup is executed only when it returns True, defaults to always returning True.

Uses: 1. Can be used as a context manager (with statement) 2. Can trigger cleanup automatically when object is destroyed 3. Supports conditional cleanup 4. Supports two-phase initialization and cleanup

Note
  • When func2 is provided, func1 is executed immediately during initialization
  • Cleanup function is executed only once
  • Cleanup occurs when object is destroyed or context is exited
Source code in lazyllm/common/common.py
class Finalizer(object):
    """Finalizer class for managing resource cleanup and release operations. Can be used as a context manager or trigger cleanup automatically when object is destroyed.

Args:
    func1 (Callable): Primary cleanup function. If func2 is provided, func1 is executed immediately and func2 becomes the cleanup function.
    func2 (Optional[Callable]): Optional cleanup function, defaults to None.
    condition (Callable): Condition function, cleanup is executed only when it returns True, defaults to always returning True.

Uses:
1. Can be used as a context manager (with statement)
2. Can trigger cleanup automatically when object is destroyed
3. Supports conditional cleanup
4. Supports two-phase initialization and cleanup

Note:
    - When func2 is provided, func1 is executed immediately during initialization
    - Cleanup function is executed only once
    - Cleanup occurs when object is destroyed or context is exited
"""
    def __init__(self, func1: Callable, func2: Optional[Callable] = None, *, condition: Callable = lambda: True):
        if func2:
            func1()
            func1 = func2
        self._func = func1
        self._condition = condition

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.__del__()

    def __del__(self):
        if self._func:
            if self._condition(): self._func()
            self._func = None

lazyllm.common.FlatList.absorb(item)

Absorb elements into the list.

Parameters:

  • item

    Element to add, can be a single element or a list

Source code in lazyllm/common/common.py
    def absorb(self, item):
        """Absorb elements into the list.

Args:
    item: Element to add, can be a single element or a list
"""
        if isinstance(item, list):
            self.extend(item)
        elif item is not None:
            self.append(item)