Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2406593
update new usage
zengkaiya Jan 24, 2026
5414854
test
happykeyan Jan 24, 2026
f56bc46
add api description
happykeyan Jan 27, 2026
1e20a25
add api_pipelines
happykeyan Jan 28, 2026
826e21c
update image_contextvqa api_pipelines
happykeyan Feb 2, 2026
132c8c3
Merge branch 'main' into main
haolpku Feb 3, 2026
e2f85d1
Merge branch 'OpenDCAI:main' into main
happykeyan Feb 5, 2026
201c048
11
chawuciren11 Feb 5, 2026
c6f1f27
111
chawuciren11 Feb 5, 2026
c750507
11
chawuciren11 Feb 5, 2026
da512c0
11
chawuciren11 Feb 5, 2026
9e1dd80
11
chawuciren11 Feb 5, 2026
41e2774
region-caption
chawuciren11 Feb 10, 2026
63b9f66
region-caption
chawuciren11 Feb 10, 2026
0b847fd
add image cpation and vqa api pipelines
zengkaiya Feb 11, 2026
86f1ead
Merge branch 'OpenDCAI:main' into main
zengkaiya Feb 11, 2026
509bc49
Merge branch 'main' of https://github.com/happykeyan/DataFlow-MM
zengkaiya Feb 11, 2026
e2f37ce
Merge branch 'OpenDCAI:main' into main
happykeyan Feb 12, 2026
597c0bb
update personalized_qa_generator
zengkaiya Feb 14, 2026
d1c4217
modify personalized_qa_generator
zengkaiya Feb 14, 2026
d291edb
region_caption
chawuciren11 Feb 21, 2026
115e0bc
1
chawuciren11 Feb 21, 2026
d50de7f
1
chawuciren11 Feb 21, 2026
bca9a21
Merge branch 'OpenDCAI:main' into main
happykeyan Feb 23, 2026
b3cae3b
11
chawuciren11 Feb 24, 2026
e158123
11
chawuciren11 Feb 24, 2026
a18e754
Merge branch 'main' of https://github.com/happykeyan/DataFlow-MM
chawuciren11 Feb 24, 2026
80e4b1f
old fix operators
zengkaiya Feb 24, 2026
a49e2e1
Merge branch 'main' of https://github.com/happykeyan/DataFlow-MM
zengkaiya Feb 24, 2026
bb8187b
fix data path
zengkaiya Feb 24, 2026
9c80a44
fix operators
zengkaiya Feb 24, 2026
5608042
fix model path
zengkaiya Feb 25, 2026
c66c6e1
1
chawuciren11 Feb 25, 2026
0e4b612
fix
Hhankyangg Feb 25, 2026
46caf2e
Merge branch 'main' of https://github.com/happykeyan/DataFlow-MM
Hhankyangg Feb 25, 2026
6aaf234
fix pipeline
Hhankyangg Feb 25, 2026
8e9cf72
fix pipeline
Hhankyangg Feb 25, 2026
b71d9ac
fix pipeline
Hhankyangg Feb 25, 2026
8499355
fix pipeline
Hhankyangg Feb 25, 2026
782af92
11
chawuciren11 Feb 25, 2026
1a79aa6
11
chawuciren11 Feb 25, 2026
349e2af
Merge branch 'main' of https://github.com/happykeyan/DataFlow-MM
Hhankyangg Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 75 additions & 69 deletions dataflow/operators/core_text/generate/prompt_templated_qa_generator.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import pandas as pd
from typing import List

from dataflow.utils.registry import OPERATOR_REGISTRY
from dataflow import get_logger

from dataflow.utils.storage import FileStorage, DataFlowStorage
from dataflow.core import OperatorABC
from dataflow.core import LLMServingABC

from dataflow.core import OperatorABC, LLMServingABC
from dataflow.serving.local_model_vlm_serving import LocalModelVLMServing_vllm
from dataflow.serving.api_vlm_serving_openai import APIVLMServing_openai
from dataflow.prompts.prompt_template import NamedPlaceholderPromptTemplate


# 提取判断是否为 API Serving 的辅助函数
def is_api_serving(serving):
return isinstance(serving, APIVLMServing_openai)


@OPERATOR_REGISTRY.register()
class PromptTemplatedQAGenerator(OperatorABC):
"""
PromptTemplatedQAGenerator:
1) 从 DataFrame 读取若干字段(由 input_keys 指定)
2) 使用 prompt_template.build_prompt(...) 生成纯文本 prompt
3) 将该 prompt 与 image/video 一起输入多模态模型,生成答案

其中 prompt_template 需要实现:
build_prompt(self, need_fields: set[str], **kwargs) -> str
3) 将该 prompt 输入大语言模型,生成纯文本答案
"""

def __init__(
Expand All @@ -35,47 +38,28 @@ def __init__(

if self.prompt_template is None:
raise ValueError(
"prompt_template cannot be None for PromptTemplatedVQAGenerator."
"prompt_template cannot be None for PromptTemplatedQAGenerator."
)

@staticmethod
def get_desc(lang: str = "zh"):
if lang == "zh":
return (
"PromptTemplatedQAGenerator:先用模板填充文本 prompt,再"
"进行问答的算子。\n"
"JSONL/DataFrame 中包含若干字段(例如 descriptions、type 等),"
"通过 input_keys 将 DataFrame 列映射到模板字段,由 prompt_template 生成最终的文本 Prompt。"
"基于模板的纯文本问答算子 (PromptTemplatedQAGenerator)。\n"
"JSONL/DataFrame 中包含若干字段,通过 input_keys 将列映射到模板字段,\n"
"由 prompt_template 动态生成纯文本 Prompt,进行批量问答。\n\n"
"特点:\n"
" - 支持动态组装复杂的纯文本 Prompt\n"
" - 统一支持 API 和本地 Local 模型部署模式\n"
" - 全局 Batch 处理,极简代码结构\n"
)
else:
return (
"PromptTemplatedQAGenerator: a QA operator that first builds "
"text prompts from a prompt template and multiple input fields, then "
"performs QA."
)

def _prepare_batch_inputs(self, prompts):

prompt_list = []

for p in prompts:
raw_prompt = [
{"role": "system", "content": self.system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": p},
],
},
]
prompt = self.serving.processor.apply_chat_template(
raw_prompt, tokenize=False, add_generation_prompt=True
"PromptTemplatedQAGenerator: a pure text QA operator that builds "
"text prompts from a template and multiple input fields, then "
"performs QA inference."
)

prompt_list.append(prompt)

return prompt_list

def run(
self,
storage: DataFlowStorage,
Expand All @@ -87,66 +71,88 @@ def run(
- storage: DataFlowStorage
- output_answer_key: 输出答案列名
- **input_keys: 模板字段名 -> DataFrame 列名
例如:
descriptions="descriptions", type="type"

逻辑:
1. 从 DataFrame 每行抽取 input_keys 对应列,形成 key_dict
2. 用 prompt_template.build_prompt(need_fields, **key_dict) 得到文本 prompt
例如:descriptions="descriptions_col", type="type_col"
"""
if output_answer_key is None:
raise ValueError("output_answer_key must be provided.")
if not output_answer_key:
raise ValueError("'output_answer_key' must be provided.")

if len(input_keys) == 0:
raise ValueError(
"PromptTemplatedVQAGenerator requires at least one input key "
"PromptTemplatedQAGenerator requires at least one input key "
"to fill the prompt template (e.g., descriptions='descriptions')."
)

self.logger.info("Running PromptTemplatedQAGenerator...")
self.output_answer_key = output_answer_key

dataframe = storage.read("dataframe")
self.logger.info(f"Loading, number of rows: {len(dataframe)}")
# 1. 加载 DataFrame
dataframe: pd.DataFrame = storage.read("dataframe")
self.logger.info(f"Loaded dataframe with {len(dataframe)} rows")

use_api_mode = is_api_serving(self.serving)
if use_api_mode:
self.logger.info("Using API serving mode")
else:
self.logger.info("Using local serving mode")

# 2. 动态生成 Prompt 文本并组装标准对话结构
need_fields = set(input_keys.keys())
prompt_column = []
conversations_list = []

for idx, row in dataframe.iterrows():
key_dict = {}
for key in need_fields:
col_name = input_keys[key] # 模板字段名 -> DataFrame 列名
key_dict[key] = row[col_name]
# 安全获取值,防止 NaN 导致字符串格式化异常
val = row.get(col_name)
key_dict[key] = val if pd.notna(val) else ""

prompt_text = self.prompt_template.build_prompt(need_fields, **key_dict)
prompt_column.append(prompt_text)

# 统一组装为基类所需的消息格式
conversations_list.append([{"role": "user", "content": prompt_text}])

self.logger.info(
f"Using prompt_template to build prompts with fields {need_fields}, "
f"prepared {len(prompt_column)} prompts."
f"Built {len(conversations_list)} prompts using fields: {need_fields}"
)

prompt_list = self._prepare_batch_inputs(prompt_column)

outputs = self.serving.generate_from_input(
# 3. 统一调用基类接口进行纯文本推理 (无需传入 image_list/video_list)
outputs = self.serving.generate_from_input_messages(
conversations=conversations_list,
system_prompt=self.system_prompt,
user_inputs=prompt_list
)

dataframe[self.output_answer_key] = outputs
# 4. 保存结果
dataframe[output_answer_key] = outputs
output_file = storage.write(dataframe)
self.logger.info(f"Results saved to {output_file}")

return output_answer_key
return [output_answer_key]


# ==========================================
# 测试用例 (Main Block)
# ==========================================
if __name__ == "__main__":
model = LocalModelVLMServing_vllm(
hf_model_name_or_path="Qwen/Qwen2.5-VL-7B-Instruct",
vllm_tensor_parallel_size=1,
vllm_temperature=0.7,
vllm_top_p=0.9,
vllm_max_tokens=512,

# 使用 API 模式测试
model = APIVLMServing_openai(
api_url="http://172.96.141.132:3001/v1",
key_name_of_api_key="DF_API_KEY",
model_name="gpt-5-nano-2025-08-07",
image_io=None,
send_request_stream=False,
max_workers=10,
timeout=1800
)

# 如需测试 Local 模型,请解开注释 (VLM 模型同样能处理纯文本)
# model = LocalModelVLMServing_vllm(
# hf_model_name_or_path="Qwen/Qwen2.5-VL-7B-Instruct",
# vllm_tensor_parallel_size=1,
# vllm_temperature=0.7,
# vllm_top_p=0.9,
# vllm_max_tokens=512,
# )

TEMPLATE = (
"Descriptions:\n"
Expand All @@ -164,18 +170,18 @@ def run(
prompt_template=prompt_template,
)

# Prepare input
# 准备输入数据
storage = FileStorage(
first_entry_file_name="./dataflow/example/text_to_text/prompt_templated_qa.jsonl",
cache_path="./cache_prompted_qa",
file_name_prefix="prompt_templated_qa",
cache_type="jsonl",
)
storage.step() # Load the data
storage.step() # 加载数据

generator.run(
storage=storage,
output_answer_key="answer",
descriptions="descriptions",
type="type",
)
)
Loading