Design Philosophy of the Data Processing Module
Basic Conventions
The basic convention for the data processing module is that the type of the dataset being processed is unified as: List[dict].
- Dataset size: The number of elements in the
List. - Element dict: A single piece of data.
Processing Modes
- In the data processing flow, each operator completes the processing of the entire dataset before passing it to the next operator, until all operators have finished processing to obtain the final result. For example, after the dataset is processed by a filter operator, the filtered dataset is obtained, which is then processed by a scoring operator to obtain the scored dataset, and so on.
- Operators are classified into two types: single data processing operators and full data processing operators.
- Single data processing operator: Each time it processes a single
dictdata inList[dict]and ultimately returns the processed singledictdata. The framework is primarily responsible for concurrency handling of these operators, applying the operator to eachdictdata inList[dict]. Examples include data cleaning operators based on regular expressions, data processing operators based on LLM, etc. For the single data processing mode, the framework automatically identifies the return data type of the operator and handles it accordingly:- Returns a dictionary
dict: Replaces the original single data. - Returns a list
List[dict]: Adds multiple new data entries. - Returns
None: Indicates keeping the passed data reference as is. - Returns an empty list
List: Indicates deleting the data.
- Returns a dictionary
- Full data processing operator: Each time it processes the entire
List[dict]dataset and ultimately returns the processed entireList[dict]dataset. The framework does not handle concurrency for these operators (users need to design it themselves); instead, it calls them sequentially, passing the entireList[dict]dataset to the operator for processing. Examples include deduplication operators based on full data, etc.
- Single data processing operator: Each time it processes a single
Framework Features
- Different concurrency methods are adopted for different task types to effectively improve performance:
- Compute-intensive tasks use multi-process concurrency, such as regular expression matching.
- I/O-intensive tasks use multi-thread concurrency + dynamic task submission (i.e., streaming concurrency processing algorithm: adopting a producer-consumer model to effectively avoid the bucket effect), such as data processing based on LLM.
- In debug mode, single-thread sequential processing is used for easy debugging.
- Supports dynamic data storage to avoid data loss caused by task exceptions (dynamic storage during concurrency), and adopts smart storage to avoid performance loss caused by frequent storage.
- Each operator in the Pipeline automatically generates an independent storage path (in jsonl format) under the configured storage root directory based on the operator name.
- Smart storage dynamically adjusts the storage frequency based on data volume and processing speed to avoid performance loss from frequent storage.
- Supports Resume function, allowing tasks to continue from the last interruption point after being interrupted.
- Supports custom operators (functions/classes) for data processing.
- Operator registration uses the decorator pattern and maintains IDE code navigation capabilities.
- Both functions and classes are uniformly registered as class operators, with consistent usage for easy calling.
- For single data processing mode, there is automatic intent recognition, which automatically identifies the data processing intent based on the data type returned by the operator.
- Supports both single data processing mode and full data processing mode.
- Supports progress bar display for task progress.
Core logic of the streaming concurrency processing algorithm: First, submit a batch of initial tasks to the thread pool up to the maximum concurrency number; then enter a core loop, which waits for and collects the first completed task, immediately produces its result (or exception), and simultaneously takes the next new task from the task iterator and submits it to the thread pool to fill the vacancy; this "complete-produce-replenish" loop continues until the task iterator is exhausted and all submitted tasks are processed.
Registering Operators
1. Simplest Usage
For users new to the framework, you only need to know that the @data_register decorator can register operators and automatically provide concurrency, storage, and resume capabilities; other details are handled automatically by the framework. For example, if a user wants to register an operator that converts content to lowercase, implementation is as follows:
from lazyllm.tools.data import data_register # Import registrar
demo = data_register.new_group('demo') # Create a category for operator grouping
@data_register('data.demo')
def process_lower(data:dict):
data['content'] = data.get('content', '').lower()
return data
For classes, registration can be quickly achieved through inheritance:
class ProcessLower(demo):
def forward(self, data:dict): # Implement processing logic by overriding the forward method
data['content'] = data.get('content', '').lower()
return data
Usage:
inputs = {'content': 'Hello World'}
m1 = lazyllm.data.demo.process_lower()
m2 = lazyllm.data.demo.ProcessLower()
res1 = m1(inputs)
res2 = m2(inputs)
print('Function result:', res1)
# Function result: [{'content': 'hello world'}]
print('Class result:', res2)
# Class result: [{'content': 'hello world'}]
Below is a complete introduction on how to design and register operators.
2. Designing Operators
Operators can be functions or classes. For functions:
- The first parameter
datais a required parameter, and its type isdictorList[dict]. Note that this parameter is passed lazily.dicttype indicates: single data (i.e.,dict) processing mode.List[dict]type indicates: full data (i.e., entire datasetList[dict]) processing mode.
- The second parameter
input_keyis used to specify the key indatato be processed as input. This is an optional parameter. Supports:None(default),str, orList[str]types.Noneindicates that the input Key is handled by the user (i.e., the user does not specify a specific input key and handles it internally in the function).strindicates: a singleinput_keyindatais used as input for processing.List[str]indicates: multipleinput_keysindataare used as input for processing.
- The third parameter
output_keyis used to specify the key to store the processed data after processing. This is an optional parameter. Supports:None(default),str, orList[str]types.Noneindicates that the output key is consistent with the input key.strindicates that the output is placed in the corresponding key field ofdata.List[str]indicates that multiple outputs are placed in multiple key fields ofdata.
Examples are as follows:
# Convert to uppercase, single data processing
def process_uppercase(data:dict, input_key='content'): # Input single data, specify handling 'content' field
data[input_key] = data.get(input_key, '').upper() # Extract content of `content` field in data, convert to uppercase and put back to original field
return data # Return processed dictionary
# Explicitly specify output key
def process_add_suffix(data:dict, input_key='content', output_key='output'):
data[output_key] = data.get(input_key, '') + '_suffix'
return data
# Specify multiple keys as input
def process_merge(data:dict, input_key=['key1', 'key2'], output_key='output'):
data[output_key] = data[input_key[0]] + data[input_key[1]]
return data
# Full data processing
def process_deduplicate(data:List[dict], input_key='content'):
seen = set()
deduplicated_data = []
for item in data:
value = item.get(input_key, '')
if value not in seen:
seen.add(value)
deduplicated_data.append(item)
return deduplicated_data
Operators can be classes. Classes need to implement forward (single data processing) or forward_batch_input (full data processing). Note that you can only choose one. data is passed in the forward method (this parameter is also passed lazily). Additionally, an __init__ method can be designed to pass other parameters. Generally, classes are used as operators when shared resources need to be passed, for example: vocabulary filtering operators need to pass vocabulary resources, etc. Examples are as follows:
class WordTableFilter:
def __init__(self, word_table, input_key='content', **kwargs):
super().__init__(**kwargs)
self.word_table = word_table
self.input_key = input_key
def forward(self, data: dict):
content = data.get(self.input_key, '')
for word in self.word_table:
if word in content:
data['filtered'] = True
return data
data['filtered'] = False
return data
3. Importing Registrar and Registering
The framework provides the registrar data_register for registering operators. The registrar mainly provides the following capabilities:
- Decorator for registering operators, supporting both function and class forms of operator registration.
- Endowing concurrency processing capabilities (single data processing operators).
- Endowing dynamic storage and Resume capabilities.
- Endowing progress bar display capabilities.
Registration examples are as follows:
# Import registrar
from lazyllm.tools.data import data_register
demo = data_register.new_group('demo') # Create a category for operator grouping
# Decorator registers operator, defaults to single data processing operator (default rewrite_func='forward')
@data_register('data.demo')
def process_uppercase(data:dict, input_key='content'):
... # Omitted processing logic
# Register as full data processing operator by setting parameter rewrite_func='forward_batch_input'
@data_register('data.demo', rewrite_func='forward_batch_input')
def process_deduplicate(data:List[dict], input_key='content'):
... # Omitted processing logic
# Register class operator via inheritance
class WordTableFilter(demo):
... # Omitted class implementation logic
Set the operator's concurrency mode via the registrar:
@data_register('data.demo', _concurrency_mode='thread')
def process_uppercase(data:dict, input_key='content'):
... # omitted processing logic
Note that there are three types of concurrency here:
thread: Multi-thread concurrency (using the streaming concurrency processing algorithm mentioned above), suitable for I/O-intensive tasks, such as data processing based on LLM.process: Multi-process concurrency (default concurrency number calculated based on CPU resources), suitable for compute-intensive tasks, such as regular expression matching, etc.single: Single-thread sequential processing, suitable for debugging in Debug mode.
Using Registered Operators for Data Processing
Data Processing Pipeline Example
Based on the LazyLLM data processing pipeline pipeline, you can easily use registered operators for data processing. Examples are as follows:
from lazyllm import pipeline
from lazyllm.tools.data import demo
# Prepare data
data = [
{'text': 'hello world'},
{'text': 'hello lazyllm'},
{'text': 'hello world'}, # Duplicate data
]
# Build data processing pipeline
with pipeline() as ppl:
ppl.upper = demo.process_uppercase(input_key='text') # input_key keeps consistent with key in data
ppl.dedup = demo.process_deduplicate(input_key='text') # input_key keeps consistent with key in data after previous step processing
ppl.add_suffix = demo.process_add_suffix(
input_key='text',
output_key='text_with_suffix',
).set_output('path/to/output') # Set output result path, export result as jsonl file, and make the result return the absolute path of the export (Note, this is not intermediate storage result, it is final result. Each operator also maintains its own intermediate results.)
# Execute data processing pipeline
result = ppl(data) # Output is: path/to/output/**.jsonl file
Operator Wrapper Common Hyperparameters Example
The operator wrapper class LazyLLMDataBase supports some common hyperparameters for controlling concurrency methods, storage behavior, etc., which are passed directly when initializing the operator:
1. Concurrency Control:
# Provide finer-grained concurrency control
process_add_suffix(
input_key='text',
output_key='text_with_suffix',
_concurrency_mode='thread', # Concurrency mode: 'thread', 'process', 'single'
_max_workers=48, # Max concurrency number
)
Note that the precedence for concurrency settings is: parameters provided during operator initialization > parameters provided to the registrar > default values.
2. Storage and Resume Control:
# Control storage behavior
process_uppercase(
input_key='text',
_save_data=True, # Whether to enable intermediate result storage (default is True)
_ignore_errors=True # Whether to ignore errors during processing (default is True, errors are logged to explicit error log)
)
The framework will generate the following storage path structure based on the configured data_process_path (or default data_pipeline_res under the working directory):
-- working_directory
|-- data_pipeline_res
|-- process_uppercase # Operator 1 (Folder name is operator name)
|-- process_uppercase_results.jsonl # Stored intermediate result file
|-- process_uppercase_results.jsonl.json # Stored progress file
|-- process_uppercase_error.jsonl # Error log
|-- process_deduplicate # Operator 2
|-- ...