Data Plugins
1. Data Plugins 简介
DataConverterPlugin
1. DataConverterPlugin 简介
DataConverter 负责将非标准格式的数据集转换为 v1 的标准 Messages 格式。这使得用户可以继续使用现有的数据集(如 Alpaca 格式),而无需手动转换。针对自定义格式的数据集,用户也可以通过构建对应的自定义 DataConverter 插件,来负责其数据格式标准化。
当前,LLaMA-Factory 已内置了 Alpaca Converter 和 Pair Converter,这两类数据集可以直接使用对应的 converter 进行标准化,无需自定义转换器。
2. Alpaca Converter 详解
2.1 Alpaca 格式
Alpaca 格式是一种常见的指令微调数据格式:
{
"system": "You are a helpful assistant.",
"instruction": "Describe a process of making crepes.",
"input": "",
"output": "Making crepes is an easy and delicious process..."
}
2.2 Alpaca Converter 接口定义
class AlpacaSample(TypedDict, total=False):
"""Alpaca 格式数据样本结构
attr:
system (str, 可选): 系统提示信息(system prompt),用于设定对话背景或模型行为。
instruction (str, 可选): 用户指令(user instruction),通常为任务描述。
input (str, 可选): 额外的输入内容(input text),可与 instruction 拼接。
output (str, 可选): 模型生成的目标输出(expected response)。
"""
...
def alpaca_converter(raw_sample: AlpacaSample) -> SFTSample:
"""将 Alpaca 样本转换为 SFT(Supervised Fine-Tuning)标准样本格式
`alpaca_converter` 将 Alpaca 数据集中一条样本转换为通用的 `SFTSample` 格式
该格式用于监督微调(SFT)或多轮对话建模
转换逻辑:
- 若存在 `system` 字段,则生成一条系统消息,loss_weight = 0.0
- 若存在 `instruction` 或 `input` 字段,则合并为一条用户消息,loss_weight = 0.0
- 若存在 `output` 字段,则生成一条助手机器人回复消息,loss_weight = 1.0
args:
raw_sample (AlpacaSample): 原始 Alpaca 数据样本
return:
SFTSample: 转换后的标准化样本,格式如下:
{
"messages": [
{"role": "system", "content": [{"type": "text", "value": "..."}], "loss_weight": 0.0},
{"role": "user", "content": [{"type": "text", "value": "..."}], "loss_weight": 0.0},
{"role": "assistant", "content": [{"type": "text", "value": "..."}], "loss_weight": 1.0},
]
}
example:
>>> raw = {"instruction": "请将以下句子翻译成英文:", "input": "你好", "output": "Hello"}
>>> alpaca_converter(raw)
{
"messages": [
{"role": "user", "content": [{"type": "text", "value": "请将以下句子翻译成英文:你好"}], "loss_weight": 0.0},
{"role": "assistant", "content": [{"type": "text", "value": "Hello"}], "loss_weight": 1.0}
]
}
"""
2.3 转换过程
alpaca_converter 函数将 Alpaca 格式转换为标准格式,转换逻辑如下:
def alpaca_converter(raw_sample: AlpacaSample) -> SFTSample:
messages = []
# 1. 添加系统提示词(如果存在)
if "system" in raw_sample:
messages.append({
"role": "system",
"content": [{"type": "text", "value": raw_sample["system"]}],
"loss_weight": 0.0
})
# 2. 添加用户输入(instruction + input)
if "instruction" in raw_sample or "input" in raw_sample:
user_content = raw_sample.get("instruction", "") + raw_sample.get("input", "")
messages.append({
"role": "user",
"content": [{"type": "text", "value": user_content}],
"loss_weight": 0.0
})
# 3. 添加模型回复
if "output" in raw_sample:
messages.append({
"role": "assistant",
"content": [{"type": "text", "value": raw_sample["output"]}],
"loss_weight": 1.0
})
return {"messages": messages}
2.4 转换示例
输入(Alpaca 格式):
{
"instruction": "What is the capital of France?",
"input": "",
"output": "The capital of France is Paris."
}
输出(标准格式):
{
"messages": [
{
"role": "user",
"content": [{"type": "text", "value": "What is the capital of France?"}],
"loss_weight": 0.0
},
{
"role": "assistant",
"content": [{"type": "text", "value": "The capital of France is Paris."}],
"loss_weight": 1.0
}
]
}
3. 自定义转换器
3.1 创建自定义转换器
如果用户有自己的数据格式,可以轻松添加自定义转换器将其标准化,实现过程可参考如下示例:
# src/llamafactory/v1/plugins/data_plugins/converter.py
from typing import TypedDict, NotRequired
from ...extras.types import SFTSample
# 1. 定义输入格式的类型
class MyCustomSample(TypedDict, total=False):
question: str
answer: str
context: NotRequired[str]
# 2. 实现转换逻辑
def custom_converter(raw_sample: MyCustomSample) -> SFTSample:
messages = []
# 构建用户消息
user_text = raw_sample["question"]
if "context" in raw_sample:
user_text = f"Context: {raw_sample['context']}\n\nQuestion: {user_text}"
messages.append({
"role": "user",
"content": [{"type": "text", "value": user_text}],
"loss_weight": 0.0
})
# 构建助手消息
messages.append({
"role": "assistant",
"content": [{"type": "text", "value": raw_sample["answer"]}],
"loss_weight": 1.0
})
return {"messages": messages}
# 3. 注册 custom_converter
#src/llamafactory/v1/plugins/data_plugins/converter.py: CONVERTERS
CONVERTERS = {
"alpaca": alpaca_converter,
"custom": custom_converter, # 添加自定义转换器
}
3.2 使用自定义转换器
在 YAML 配置中指定转换器名称:
my_dataset:
file_name: custom_data.json
converter: custom
DataLoaderPlugin
1. DataLoaderPlugin 简介
DataLoaderPlugin 负责从本地文件加载数据集,当前支持如下文件格式:
JSON:
.jsonJSONL:
.jsonlCSV:
.csvParquet:
.parquetArrow:
.arrowText:
.txt
2. DataLoaderPlugin 接口定义
@dataclass
class DataLoaderPlugin:
"""数据加载插件(DataLoaderPlugin)
负责根据数据集信息(`DatasetInfo`)自动加载本地或远程数据集。
支持多种文件格式(如 CSV、JSON、Parquet、Text、Arrow),并可选择是否以流式方式加载。
通常由 `DataEngine` 调用,用于统一封装数据加载逻辑。
"""
args: DataArguments
"""数据参数对象,包含数据目录、缓存路径、分片等配置信息。"""
def _get_builder_name(self, path: str) -> Literal["arrow", "csv", "json", "parquet", "text"]:
"""获取数据集文件格式
根据输入文件路径自动判断应使用的 HuggingFace `load_dataset` 构建器类型。
通过文件扩展名推断数据类型,例如 `.csv`、`.jsonl`、`.parquet`、`.txt` 等。
args:
path (str): 数据集文件路径,用于识别文件类型。
return:
Literal["arrow", "csv", "json", "parquet", "text"]:
数据构建器名称,用于 `datasets.load_dataset()`。
example:
>>> _get_builder_name("data/train.jsonl")
"json"
"""
...
def auto_load_data(self, dataset_info: DatasetInfo) -> HFDataset:
"""根据传入的 `dataset_info` 自动选择合适的加载方式
args:
dataset_info (DatasetInfo): 数据集元信息,通常包含:
- `file_name`: 数据文件路径
- `split`: 数据划分(如 "train"、"test");
- `streaming`: 是否启用流式加载
return:
HFDataset: 加载完成的 Hugging Face 数据集对象。
example:
>>> plugin = DataLoaderPlugin(args)
>>> ds = plugin.auto_load_data({"file_name": "~/data.json", "split": "train"})
"""
...
def load_data_from_file(self, filepath: str, split: str, streaming: bool) -> HFDataset:
"""从文件或目录加载数据集
根据输入路径自动识别文件类型(CSV、JSON、Parquet、Text 等),
并通过 `datasets.load_dataset()` 加载数据集。
若 `streaming=True`,则将结果转换为迭代式数据集。
args:
filepath (str): 文件路径或目录路径。
split (str): 数据划分名称(如 "train"、"validation")。
streaming (bool): 是否启用流式加载模式。
return:
HFDataset: 加载后的数据集对象。
example:
>>> plugin.load_data_from_file("data/train.json", "train", False)
"""
...
DataIndexPlugin
1. DataIndexPlugin 简介
DataIndexPlugin 负责调整数据索引,支持通过配置 size, weight 等参数控制数据集样本数量和采样频率。
使用
size参数 限制使用的样本数量:
my_dataset:
file_name: large_dataset.json
size: 1000 # 只使用前 1000 个样本
使用
weight参数调整数据集在混合数据中的采样频率:
dataset_a:
file_name: data_a.json
weight: 1.0
dataset_b:
file_name: data_b.json
weight: 2.0 # dataset_b 的样本出现频率是 dataset_a 的 2 倍
说明:weight 参数适用于在多个数据集混合训练时,调整不同数据集的的采样频率
当
weight=1.0时,数据集按原始比例采样当
weight=2.0时,该数据集的索引会复制 2 倍,使其样本出现频率翻倍
2. DataIndexPlugin 接口定义
@dataclass
class DataIndexPlugin:
"""数据索引插件(DataIndexPlugin)
根据 `size` 和 `weight` 调整数据索引列表,控制数据集的样本数量和采样频率
通常在多数据集混合训练时使用,以控制不同数据集在总体样本中的占比。
在 `DataEngine.build_data_index` 中被自动调用,用于实现样本重采样或加权分布。
"""
def adjust_data_index(
self, data_index: list[tuple[str, int]], size: Optional[int], weight: Optional[float]
) -> list[tuple[str, int]]:
"""调整数据索引列表
根据 `size` 或 `weight` 参数对输入的数据索引进行采样、扩展或缩减。
若两个参数同时存在,将依次执行基于大小和基于权重的调整。
args:
data_index (list[tuple[str, int]]):
数据索引列表,每个元素为 `(dataset_name, sample_index)`。
size (Optional[int]):
目标样本数量,若指定则根据该数量裁剪或重复样本。
weight (Optional[float]):
数据集权重,用于控制数据集在混合训练中的采样比例。
return:
list[tuple[str, int]]:
调整后的数据索引列表。
example:
>>> plugin = DataIndexPlugin()
>>> adjusted = plugin.adjust_data_index([("ds1", i) for i in range(100)], size=50, weight=None)
>>> len(adjusted)
50
"""
...
def adjust_by_size(self, data_index: list[tuple[str, int]], size: int) -> list[tuple[str, int]]:
"""根据目标大小调整数据索引
通过裁剪或重复样本,使索引总数等于 `size`。
常用于统一不同数据集的样本数量。
args:
data_index (list[tuple[str, int]]):
原始数据索引列表。
size (int):
目标样本数量。
return:
list[tuple[str, int]]:
调整后长度等于 `size` 的数据索引列表。
example:
>>> plugin.adjust_by_size([("ds1", i) for i in range(10)], 20)
"""
...
def adjust_by_weight(self, data_index: list[tuple[str, int]], weight: float) -> list[tuple[str, int]]:
"""根据权重调整数据索引
通过加权采样或重复样本,使数据集样本出现频率符合指定权重。
常用于多数据源训练中按比例平衡样本。
args:
data_index (list[tuple[str, int]]):
原始数据索引列表。
weight (float):
数据集权重(相对比例,可与其他数据集共同归一化)。
return:
list[tuple[str, int]]:
调整后的加权数据索引列表。
example:
>>> plugin.adjust_by_weight([("ds1", i) for i in range(10)], 0.5)
"""
...
DataSelectorPlugin
1. DataSelectorPlugin 简介
DataSelectorPlugin 为 DataEngine提供基于索引访问数据的功能,由 DataEngine 的 __getitem__ 方法自动调用。
2. DataSelectorPlugin 接口定义
@dataclass
class DataSelectorPlugin:
"""根据索引选择数据集样本。
配合 `DataEngine` 使用,通过统一的 `data_index` 结构(包含数据集名与样本索引)来实现灵活的数据选择
"""
data_index: list[tuple[str, int]]
"""数据索引列表,每个元素为 (dataset_name, sample_index)。"""
def select(self, index: Union[slice, list[int], Any]) -> Union[tuple[str, int], list[tuple[str, int]]]:
"""选择数据集样本
根据输入类型从 `data_index` 中选择对应的样本索引
支持三种索引方式:
- 切片(slice):返回对应范围内的样本
- 索引列表(list[int]):返回指定索引处的多个样本
- 其他类型输入将触发异常。
args:
index (Union[slice, list[int], Any]): 数据样本索引
可以是切片(`slice`)或索引列表
return:
Union[tuple[str, int], list[tuple[str, int]]]:
- 若为单个索引:返回一个 `(dataset_name, sample_index)`
- 若为多个索引或切片:返回多个样本的列表
except:
Raises:
ValueError: 当输入索引类型不受支持时抛出。
...