diff --git a/dataflow/example/image_editing/prompts.jsonl b/dataflow/example/image_editing/prompts.jsonl deleted file mode 100644 index 529c86e..0000000 --- a/dataflow/example/image_editing/prompts.jsonl +++ /dev/null @@ -1,3 +0,0 @@ -{"conversations": [{"content": "Change the woman's clothes to a white dress.", "role": "user"}], "images": ["https://huggingface.co/datasets/OpenDCAI/dataflow-demo-image/resolve/main/seg_images/image1.png"], "edited_images": [""]} -{"conversations": [{"content": "Change the vase to red.", "role": "user"}], "images": ["https://huggingface.co/datasets/OpenDCAI/dataflow-demo-image/resolve/main/seg_images/image2.png"], "edited_images": [""]} -{"conversations": [{"content": "The woman is dancing with the prince in a sacred ballroom.", "role": "user"}], "images": ["https://huggingface.co/datasets/OpenDCAI/dataflow-demo-image/resolve/main/seg_images/image3.png"], "edited_images": [""]} \ No newline at end of file diff --git a/dataflow/example/image_gen/image_edit/human_inpaint.jpg b/dataflow/example/image_gen/image_edit/human_inpaint.jpg deleted file mode 100644 index b71e4e9..0000000 Binary files a/dataflow/example/image_gen/image_edit/human_inpaint.jpg and /dev/null differ diff --git a/dataflow/example/image_gen/image_edit/image.png b/dataflow/example/image_gen/image_edit/image.png deleted file mode 100644 index f38d190..0000000 Binary files a/dataflow/example/image_gen/image_edit/image.png and /dev/null differ diff --git a/dataflow/example/image_gen/image_edit/prompts.jsonl b/dataflow/example/image_gen/image_edit/prompts.jsonl deleted file mode 100644 index b63eca8..0000000 --- a/dataflow/example/image_gen/image_edit/prompts.jsonl +++ /dev/null @@ -1,5 +0,0 @@ -{"conversations": [{"content": "turn this image into cybor style", "role": "user"}], "images": ["../example_data/image_gen/image_edit/image.png"]} -{"conversations": [{"content": "Change the vase to red.", "role": "user"}], "images": ["../example_data/image_gen/image_edit/image.png"]} -{"conversations": [{"content": "Turn this cow into a dog.", "role": "user"}], "images": ["../example_data/image_gen/image_edit/image.png"]} -{"conversations": [{"content": "A pretty girl is playing with these toys.", "role": "user"}], "images": ["../example_data/image_gen/image_edit/image.png"]} -{"conversations": [{"content": "Complete the shadowed part into a generic person's portrait.", "role": "user"}], "images": ["../example_data/image_gen/image_edit/human_inpaint.jpg"]} diff --git a/dataflow/example/image_gen/multi_image_input_gen/prompts.jsonl b/dataflow/example/image_gen/multi_image_input_gen/prompts.jsonl deleted file mode 100644 index 1c3f987..0000000 --- a/dataflow/example/image_gen/multi_image_input_gen/prompts.jsonl +++ /dev/null @@ -1,5 +0,0 @@ -{"input_text":[{"content":"a photo of Bassist, 13–18 years old, Ponytail, Black T-shirt + jeans, Dark skin, white background"},{"content":"a photo of Conductor, 13–18 years old, Long, Dress, Brown skin, white background"},{"content":"a photo of Keyboardist, 30–39 years old, Ponytail, Sequined jacket, Tan skin, white background"},{"content":"a photo of Singer, 40–49 years old, Spiky, Stage suit, Fair skin, white background"}],"output_img_discript":[{"content":"Generate a naturalistic image based on the following description. The central elements must be seamlessly integrated, maintaining visual continuity without separation: A teenage bassist with dark skin stands confidently on a stage, her black T-shirt and jeans contrasting with her vibrant surroundings. Her hair, pulled back into a ponytail, sways gently as she sways to the rhythm. Nearby, a young conductor with long hair, dressed in an elegant dress that complements her brown skin, raises her baton gracefully, eyes intent as she guides the musicians with fluid movements. On the opposite side, a keyboardist in his thirties, his tan skin highlighted by the shimmering sequined jacket he wears, focuses intently on his instrument, fingers dancing across the keys. At the forefront, a singer in his forties commands the stage. His spiky hair and stage suit add a flair to his fair skin as he projects his voice powerfully, engaging both the visible audience and his fellow musicians. The spatial arrangement of these performers creates a dynamic harmony, each element playing a crucial role in the symphony of their shared performance."}]} -{"input_text":[{"content":"a photo of Seer, 30–39 years old, Bald, Robe, Fair skin, white background"},{"content":"a photo of Wizard, 40–59 years old, Hooded, Boots, Fair skin, white background"},{"content":"a photo of Wizard, Elder (60+ years old), Hooded, Cloak, Light skin, white background"},{"content":"a photo of Guardian, 30–39 years old, Long, Amulet + sash, Brown skin, white background"}],"output_img_discript":[{"content":"Generate a naturalistic image based on the following description. The central elements must be seamlessly integrated, maintaining visual continuity without separation: A Seer, appearing 30–39 years old with a shaved head, dons a flowing robe that contrasts elegantly with fair skin, standing in a sunlit setting. Nearby, a middle-aged Wizard, hooded and clad in sturdy boots, shares fair skin and exudes a presence befitting the bright day. Another Wizard, elder and possessing a distinguished visage, shrouded in a voluminous cloak, maintains an air of wisdom against the light skin illuminated by the sunlight. Completing the ensemble, a Guardian, also aged 30–39, showcases long, cascading locks and a striking brown complexion adorned with an amulet and sash that catch the eye as they stand under the radiant sky.\n\n The scene unfolds in bright daylight where the Seer, robed and serene, stands as if contemplating a shared mystery with the middle-aged Wizard. The latter, hooded with boots firmly planted on the ground, turns slightly toward the Seer, suggesting a silent exchange of arcane knowledge between them. In this illuminated space, the Elder Wizard, draped in his cloak, observes with a contemplative gaze, as though piecing together the wisdom imparted by the others. Meanwhile, the Guardian, resplendent with an amulet and sash, stands a step aside, poised protectively, yet engaged, fortifying the group with his presence. The interplay among these figures, under the bright canopy of daylight, creates a tapestry of companionship and mystical dialogue, each character's stance and attire contributing to their collaborative narrative in this scenic rendezvous."}]} -{"input_text":[{"content":"a photo of Defender, 40–49 years old, Ponytail, Security vest + black pants, Dark skin, white background"},{"content":"a photo of Villager, 30–39 years old, Braids, Armor, Dark skin, white background"},{"content":"a photo of Whispery white curtains brushing sun-warmed parquet"}],"output_img_discript":[{"content":"Generate a naturalistic image based on the following description. The central elements must be seamlessly integrated, maintaining visual continuity without separation: A Defender in his forties stands tall on a sun-warmed parquet floor, wearing a bright security vest and black pants. His ponytail sways gently as he focuses on maintaining the peace. Nearby, a Villager in her thirties, adorned in intricately designed armor, appears vigilant and strong with her braids cascading over her shoulders. The dark skin of both individuals contrasts beautifully with the whispery white curtains that brush softly against the floor. Sunlight filters through the curtains, casting warm hues on the parquet, creating a serene ambiance. The Defender and Villager seem poised for action, united in their attention to their surroundings and suggesting a readiness to protect the space they inhabit. The interaction is subtle yet significant, with the shared setting emphasizing their silent camaraderie in a peaceful yet alert atmosphere."}]} -{"input_text":[{"content":"a photo of Merchant, 20–29 years old, Braids, Apron + shirt, Light skin, white background"},{"content":"a photo of Solitaire Ring, Bronze, Magnetic Clasp, white background"},{"content":"a photo of Matte Lipstick Tube, Sand Shade, Airless Pump, white background"},{"content":"a photo of Blazer, Cream, Recycled Polyester, white background"}],"output_img_discript":[{"content":"Generate a naturalistic image based on the following description. The central elements must be seamlessly integrated, maintaining visual continuity without separation: A young merchant, aged between 20 and 29, with braids, light skin, and wearing an apron over a shirt, stands thoughtfully in a cozy boutique. Her fingers delicately clasp a bronze solitaire ring with a magnetic clasp, inspecting its intricate design before placing it on her finger. Beside her, a matte lipstick tube in a sand shade, featuring an airless pump, rests prominently on a display counter. Her gaze shifts to a pristine cream blazer crafted from recycled polyester, hanging elegantly on a near rack, its fabric catching the subtle ambient light of the shop. The scene is captured in a documentary neutral tone, highlighting the merchant's engagement with the products, as she considers integrating these elements into her ensemble."}]} -{"input_text":[{"content":"a photo of Squad mate, 30–39 years old, Shaved, Field jacket, Brown skin, white background"},{"content":"a photo of Gemstone Studs, PVD Coating, IP Plating, white background"},{"content":"a photo of Huggie Earrings, Hammered Texture, Cord Thread, white background"},{"content":"a photo of Tie Clip, Wood Accents, Hypoallergenic, white background"}],"output_img_discript":[{"content":"Generate a naturalistic image based on the following description. The central elements must be seamlessly integrated, maintaining visual continuity without separation: A squad mate, aged 30–39 with a shaved head and brown skin, stands confidently in a well-worn field jacket. The jacket, suitable for outdoor activities, serves as the perfect backdrop for the sparkling accessories that elevate the ensemble. The squad mate proudly wears gemstone studs that glisten with their PVD coating and IP plating, complementing the earthy tones of his attire while adding a sophisticated touch to his appearance.\n\nNearby, huggie earrings with a distinct hammered texture lightly dangle, catching the light with each subtle movement and adding a dynamic element to the scene. These earrings are artistically threaded with cord, presenting a harmonious blend of ruggedness and elegance.\n\nAdding to the refined aesthetic is a tie clip with wood accents, nestled securely on the fabric of the jacket. The hypoallergenic material ensures comfort, while the natural wood's warm tones resonate with the squad mate's earthy style. This meticulous combination of jewelry and apparel paints a picture of understated sophistication and attention to detail, as each element interacts seamlessly to create a cohesive and visually appealing narrative."}]} diff --git a/dataflow/example/image_gen/text2image/prompts.jsonl b/dataflow/example/image_gen/text2image/prompts.jsonl deleted file mode 100644 index d5b9f5a..0000000 --- a/dataflow/example/image_gen/text2image/prompts.jsonl +++ /dev/null @@ -1,20 +0,0 @@ -{"conversations": [{"content": "a fox darting between snow-covered pines at dusk", "role": "user"}]} -{"conversations": [{"content": "a kite surfer riding emerald waves under a cloudy sky", "role": "user"}]} -{"conversations": [{"content": "an elderly couple strolling through a flower market at midday", "role": "user"}]} -{"conversations": [{"content": "a vintage car parked by a desert highway at twilight", "role": "user"}]} -{"conversations": [{"content": "a group of dancers practicing in an abandoned warehouse under fluorescent lights", "role": "user"}]} -{"conversations": [{"content": "a cello player performing on a foggy riverside pier", "role": "user"}]} -{"conversations": [{"content": "a peregrine falcon diving over rocky cliffs at noon", "role": "user"}]} -{"conversations": [{"content": "a street artist spray-painting a mural in an urban alley at golden hour", "role": "user"}]} -{"conversations": [{"content": "a child blowing soap bubbles in a sunlit garden", "role": "user"}]} -{"conversations": [{"content": "a futuristic robot tending plants in a vertical greenhouse at night", "role": "user"}]} -{"conversations": [{"content": "a koi fish gliding among lily pads in a tranquil pond during spring rain", "role": "user"}]} -{"conversations": [{"content": "a surfer silhouette framed by a fiery sunset sky", "role": "user"}]} -{"conversations": [{"content": "a mountaineer reaching the summit during a full moon", "role": "user"}]} -{"conversations": [{"content": "a ballet dancer rehearsing in a grand theater before an audience", "role": "user"}]} -{"conversations": [{"content": "a lantern festival illuminating a mountain valley under a starlit sky", "role": "user"}]} -{"conversations": [{"content": "a jazz quartet playing in a rain-soaked street corner at midnight", "role": "user"}]} -{"conversations": [{"content": "a mechanic fixing a steaming engine in a rustic barn at sunset", "role": "user"}]} -{"conversations": [{"content": "a flock of geese flying over golden marshlands in autumn", "role": "user"}]} -{"conversations": [{"content": "a graffiti artist unveiling a colorful piece on a rooftop during a citywide art festival", "role": "user"}]} -{"conversations": [{"content": "a barista serving coffee on a bustling city sidewalk during morning rush hour", "role": "user"}]} diff --git a/dataflow/example/image_gen/unified_image_gen/prompts.jsonl b/dataflow/example/image_gen/unified_image_gen/prompts.jsonl deleted file mode 100644 index 665e222..0000000 --- a/dataflow/example/image_gen/unified_image_gen/prompts.jsonl +++ /dev/null @@ -1,5 +0,0 @@ -{"image_condition_input": [{"content": "A single wooden boat, isolated object, high detail, white background"}, {"content": "A futuristic cyberpunk neon sign, glowing, isolated object, white background"}, {"content": "A minimalist Scandinavian chair, clean design, isolated object, white background"}], "image_edit_input": [{"content": "Extract the main elements from these images and combine them into a new natural image with a 1:1 aspect ratio. 1"}]} -{"image_condition_input": [{"content": "Matte black wireless earbuds, isolated product, white background"}, {"content": "Smartwatch exploded view showing components, isolated product, white background"}, {"content": "Eco-friendly shampoo bottle with dew drops, isolated product, white background"}, {"content": "Sneakers with motion blur effect, isolated product, white background"}], "image_edit_input": [{"content": "Extract the main elements from these images and combine them into a new natural image with a 1:1 aspect ratio. 2"}]} -{"image_condition_input": [{"content": "Anime-style female mage figure, dynamic pose, isolated character, white background"}, {"content": "Cute chubby corgi astronaut, sticker style, isolated character, white background"}, {"content": "Steampunk inventor figure with goggles, isolated character, white background"}, {"content": "Medieval knight on horseback, isolated figure, white background"}], "image_edit_input": [{"content": "Extract the main elements from these images and combine them into a new natural image with a 1:1 aspect ratio. 3"}]} -{"image_condition_input": [{"content": "Modern glass house miniature model, isolated object, white background"}, {"content": "Futuristic transport pod, isolated object, white background"}, {"content": "Old European street lamp, isolated object, white background"}, {"content": "Minimalist tea set, isolated object, white background"}], "image_edit_input": [{"content": "Extract the main elements from these images and combine them into a new natural image with a 1:1 aspect ratio. 4"}]} -{"image_condition_input": [{"content": "Bowl of ramen with chashu and egg, isolated food item, white background"}, {"content": "Stack of blueberry pancakes with syrup, isolated food item, white background"}, {"content": "Artisanal sourdough loaf, isolated food item, white background"}, {"content": "Colorful poke bowl, isolated food item, white background"}], "image_edit_input": [{"content": "Extract the main elements from these images and combine them into a new natural image with a 1:1 aspect ratio. 5"}]} \ No newline at end of file diff --git a/dataflow/operators/core_vision/generate/prompted_image_edit_generator.py b/dataflow/operators/core_vision/generate/prompted_image_edit_generator.py index 9bca55d..1f9f2af 100644 --- a/dataflow/operators/core_vision/generate/prompted_image_edit_generator.py +++ b/dataflow/operators/core_vision/generate/prompted_image_edit_generator.py @@ -1,6 +1,7 @@ import os import pandas as pd import numpy as np +from pathlib import Path from dataflow.utils.registry import OPERATOR_REGISTRY from dataflow import get_logger @@ -26,6 +27,27 @@ def get_desc(lang: str = "en") -> str: if lang != "zh" else "基于给定的大量图片以及对应的编辑指令,生成对应的编辑结果" ) + + def _resolve_image_path(self, image_path: str, base_dir: str) -> str: + """ + Resolve image path. If it's a relative path, resolve it to an absolute path relative to base_dir. + If it's an absolute path, return it directly. + """ + if os.path.isabs(image_path): + return image_path + else: + return os.path.normpath(os.path.join(base_dir, image_path)) + + def _resolve_image_paths(self, image_paths, base_dir: str): + """ + Resolve image paths (supports single path or list of paths). If relative paths, resolve them relative to base_dir. + """ + if isinstance(image_paths, str): + return self._resolve_image_path(image_paths, base_dir) + elif isinstance(image_paths, list): + return [self._resolve_image_paths(item, base_dir) for item in image_paths] + else: + return image_paths def run( self, @@ -38,39 +60,125 @@ def run( if output_image_key is None: raise ValueError("At least one of output_key must be provided.") - # Read prompts into a DataFrame + self.logger = get_logger() + + # 获取 prompts.jsonl 文件所在目录,用于解析相对路径 + if hasattr(storage, 'first_entry_file_name') and storage.first_entry_file_name: + jsonl_file_path = Path(storage.first_entry_file_name).resolve() + base_dir = str(jsonl_file_path.parent) + self.logger.info(f"Using base directory for relative paths: {base_dir}") + else: + # 如果没有 first_entry_file_name,使用当前工作目录 + base_dir = os.getcwd() + self.logger.warning(f"Could not determine JSONL file location, using current working directory: {base_dir}") + + # 总是从原始输入文件读取所有记录 df = storage.read(output_type="dict") df = pd.DataFrame(df) if not isinstance(df, pd.DataFrame): raise ValueError("storage.read must return a pandas DataFrame") + + total = len(df) + self.logger.info(f"Read {total} records from input file") + + # 尝试从缓存文件读取已处理的结果(用于断点续跑) + cache_file_path = os.path.join(storage.cache_path, f"{storage.file_name_prefix}_step1.{storage.cache_type}") + cache_df = None + if os.path.exists(cache_file_path): + try: + cache_df = pd.DataFrame(storage._load_local_file(cache_file_path, storage.cache_type)) + self.logger.info(f"Found cache file with {len(cache_df)} records, will merge for resume functionality") + except Exception as e: + self.logger.warning(f"Failed to load cache file: {e}, will process all records") - # Initialize the output column with empty lists if output_image_key not in df.columns: df[output_image_key] = [[] for _ in range(len(df))] - - processed = 0 - total = len(df) + + # 如果有缓存,合并已处理的结果 + if cache_df is not None and output_image_key in cache_df.columns: + for idx in df.index: + if idx < len(cache_df): + cache_output = cache_df.at[idx, output_image_key] + if isinstance(cache_output, list) and len(cache_output) > 0 and cache_output[0] != "": + df.at[idx, output_image_key] = cache_output batch_prompts = [] + skipped_count = 0 for idx, row in df.iterrows(): if output_image_key in row.keys(): - if len(row[output_image_key]) > 0: - if row[output_image_key][0] != "": + output_value = row[output_image_key] + if isinstance(output_value, list) and len(output_value) > 0: + if output_value[0] != "": + skipped_count += 1 + self.logger.info(f"Skipping record {idx} (already processed: {output_value[0]})") continue if save_image_with_idx: - batch_prompts.append({"idx": idx, "image_path": df.at[idx, input_image_key], "prompt": df.at[idx, input_conversation_key][-1]["content"]}) + # 构建输入数据,支持多轮对话 + image_path = df.at[idx, input_image_key] + resolved_image_path = self._resolve_image_paths(image_path, base_dir) + + conversations = df.at[idx, input_conversation_key] + + # 提取 prompt:对于多轮对话,使用最后一条消息;对于单轮对话,也使用最后一条消息 + if isinstance(conversations, list) and len(conversations) > 0: + # 获取最后一条消息的内容 + last_message = conversations[-1] + if isinstance(last_message, dict) and "content" in last_message: + prompt_text = last_message["content"] + else: + prompt_text = str(last_message) + else: + prompt_text = "" + + prompt_data = { + "idx": idx, + "image_path": resolved_image_path, + "prompt": prompt_text, + } + + if isinstance(conversations, list) and len(conversations) > 1: + prompt_data["conversations"] = conversations + + batch_prompts.append(prompt_data) else: - batch_prompts.append((df.at[idx, input_image_key], df.at[idx, input_conversation_key][-1]["content"])) - generated = self.image_edit_serving.generate_from_input(batch_prompts) + image_path = df.at[idx, input_image_key] + resolved_image_path = self._resolve_image_paths(image_path, base_dir) + batch_prompts.append((resolved_image_path, df.at[idx, input_conversation_key][-1]["content"])) + + self.logger.info(f"Processing {len(batch_prompts)} records (skipped {skipped_count} already processed)") + if len(batch_prompts) == 0: + self.logger.info("No records to process, all records are already completed") + return + + try: + generated = self.image_edit_serving.generate_from_input(batch_prompts) + except Exception as e: + self.logger.error(f"Error during image generation: {e}") + import traceback + self.logger.error(traceback.format_exc()) + self.logger.warning("Processing failed, skipping cache write") + return + for idx, prompt in enumerate(batch_prompts): if save_image_with_idx: - df.at[prompt['idx'], output_image_key] = generated.get(f"sample_{prompt['idx']}", []) - + result = generated.get(f"sample_{prompt['idx']}", []) + df.at[prompt['idx'], output_image_key] = result + if result: + self.logger.info(f"Record {prompt['idx']} processed successfully, got {len(result)} image(s)") + else: + self.logger.warning(f"Record {prompt['idx']} processed but got empty result (may have failed)") else: if isinstance(prompt, tuple): prompt = prompt[1] - df.at[idx, output_image_key] = generated[idx] if isinstance(generated, list) else generated.get(prompt, []) + result = generated[idx] if isinstance(generated, list) else generated.get(prompt, []) + df.at[idx, output_image_key] = result - # Final flush of any remaining prompts - storage.media_key = output_image_key - storage.write(df) + try: + storage.media_key = output_image_key + storage.write(df) + self.logger.info(f"Saved {len(df)} records to cache") + except Exception as e: + self.logger.error(f"Failed to write cache: {e}") + import traceback + self.logger.error(traceback.format_exc()) + raise diff --git a/dataflow/operators/core_vision/generate/prompted_image_generator.py b/dataflow/operators/core_vision/generate/prompted_image_generator.py index 332bf7f..255734b 100644 --- a/dataflow/operators/core_vision/generate/prompted_image_generator.py +++ b/dataflow/operators/core_vision/generate/prompted_image_generator.py @@ -34,6 +34,8 @@ def run( output_image_key: str = "images", save_image_with_idx: bool = True, ): + logger = get_logger() + if output_image_key is None: raise ValueError("At least one of output_key must be provided.") @@ -47,9 +49,6 @@ def run( if output_image_key not in df.columns: df[output_image_key] = [[] for _ in range(len(df))] - processed = 0 - total = len(df) - prompts_and_idx = [] save_id_list = [] for idx, row in df.iterrows(): @@ -74,11 +73,15 @@ def run( storage.write(df) return + logger.info(f"Processing {len(prompts_and_idx)} prompts...") + if save_image_with_idx: batch_prompts = save_id_list else: batch_prompts = [p for p, _ in prompts_and_idx] + generated = self.t2i_serving.generate_from_input(batch_prompts) + for prompt, idx in prompts_and_idx: imgs = generated.get(prompt, []) if imgs is None: diff --git a/dataflow/serving/api_image_gen_serving.py b/dataflow/serving/api_image_gen_serving.py new file mode 100644 index 0000000..913150e --- /dev/null +++ b/dataflow/serving/api_image_gen_serving.py @@ -0,0 +1,750 @@ +# dataflow/serving/api_image_gen_serving.py +import os +import base64 +import requests +from typing import Any, List, Dict, Optional, Callable, Tuple +from PIL import Image +from io import BytesIO +from dataflow.core import VLMServingABC +from dataflow import get_logger + +try: + from google import genai + from google.genai import types + GEMINI_AVAILABLE = True +except ImportError: + GEMINI_AVAILABLE = False + + +class APIImageGenServing(VLMServingABC): + API_FORMAT_OPENAI = "openai" + API_FORMAT_GEMINI = "gemini" + + def __init__( + self, + api_url: str, + image_io, + Image_gen_task: str = "text2image", + batch_size: int = 4, + timeout: int = 300, + connect_timeout: int = 30, + api_format: str = "openai", + api_key: Optional[str] = None, + model_name: str = "dall-e-3", + ): + """ + :param api_url: Base URL of the cloud API (e.g. "https://api.openai.com/v1") + :param image_io: ImageIO instance, for saving generated images + :param Image_gen_task: Task type, "text2image" or "imageedit" + :param batch_size: Batch size + :param timeout: Request timeout (seconds) + :param api_format: API format type, "openai" or "gemini" (default is "openai") + :param api_key: API key (directly from parameters, not from environment variables) + :param model_name: Model name (OpenAI: "dall-e-3", Gemini: "gemini-2.5-flash-image", "gemini-3-pro-image", etc. (default is "dall-e-3")) + """ + self.api_url = api_url.rstrip("/") + self.image_io = image_io + self.image_gen_task = Image_gen_task + self.batch_size = batch_size + self.timeout = timeout + self.connect_timeout = connect_timeout + self.api_format = api_format + self.model_name = model_name + self.logger = get_logger() + + self.api_key = api_key + + if not self.api_key: + self.logger.warning("API key not provided. Some APIs may require authentication.") + + if api_format == "gemini": + if not GEMINI_AVAILABLE: + raise ImportError( + "google.genai library is required for Gemini API. " + "Please install it: pip install google-genai" + ) + + if not self.api_key: + raise ValueError("Gemini API key is required! Please provide it via --api_key parameter.") + + if self.api_url: + http_options = types.HttpOptions( + base_url=self.api_url, + timeout=None + ) + else: + http_options = types.HttpOptions(timeout=None) + + try: + self.gemini_client = genai.Client( + api_key=self.api_key, + http_options=http_options + ) + except Exception as e: + self.logger.error(f"Failed to initialize Gemini client: {e}") + raise + else: + self.gemini_client = None + + if api_format not in ["openai", "gemini"]: + raise ValueError(f"Unsupported api_format: {api_format}. Only 'openai' and 'gemini' are supported.") + + def _build_openai_request(self, user_input: Any) -> Dict[str, Any]: + """ + Build a single OpenAI image generation request (supports DALL-E 2/3 and gpt-image-1) + """ + if isinstance(user_input, dict): + prompt = user_input.get("text_prompt", user_input.get("prompt", "")) + else: + prompt = str(user_input) + + request = { + "prompt": prompt, + "n": 1, # number of images to generate (1-10) + } + + # Set default size based on model + # DALL-E 2: "256x256", "512x512", "1024x1024" + # DALL-E 3: "1024x1024", "1024x1792", "1792x1024" + # GPT Image: "1024x1024", "1024x1792", "1792x1024" + if self.model_name and "dall-e-2" in self.model_name.lower(): + request["size"] = "1024x1024" # default size for DALL-E 2 + else: + request["size"] = "1024x1024" # default size for DALL-E 3 and GPT Image + + if isinstance(user_input, dict): + user_size = user_input.get("size") + if user_size: + request["size"] = user_size + + if self.model_name: + request["model"] = self.model_name + + # GPT Image model supports quality parameter (low, standard, high) + if self.model_name and "gpt-image" in self.model_name.lower(): + quality = user_input.get("quality", "standard") if isinstance(user_input, dict) else "standard" + request["quality"] = quality + + # GPT Image supports transparent background + if isinstance(user_input, dict) and user_input.get("background") == "transparent": + request["background"] = "transparent" + # transparent background is recommended to use PNG or WebP format + request["response_format"] = "b64_json" + + return request + + def _parse_openai_response(self, response: Dict[str, Any], user_input: Any) -> Tuple[str, Optional[Image.Image]]: + """Parse OpenAI image generation/editing response, return (key, image)""" + if "data" in response and len(response["data"]) > 0: + first_item = response["data"][0] + + if "url" in first_item: + img_url = first_item["url"] + if img_url: + img_resp = requests.get(img_url, timeout=30) + img = Image.open(BytesIO(img_resp.content)).convert("RGB") + + if isinstance(user_input, dict): + key = str(user_input.get("sample_id", user_input.get("idx", user_input.get("text_prompt", "default")))) + else: + key = str(user_input) + return key, img + + # Check if there is base64 encoded image (if response_format="b64_json") + elif "b64_json" in first_item: + import base64 + b64_data = first_item["b64_json"] + img_data = base64.b64decode(b64_data) + img = Image.open(BytesIO(img_data)).convert("RGB") + + # Determine key + if isinstance(user_input, dict): + key = str(user_input.get("sample_id", user_input.get("idx", user_input.get("text_prompt", "default")))) + else: + key = str(user_input) + return key, img + + if isinstance(user_input, dict): + key = str(user_input.get("sample_id", user_input.get("idx", user_input.get("text_prompt", "default")))) + else: + key = str(user_input) + return key, None + + def _build_openai_edit_request(self, user_input: Any) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Build OpenAI image editing request (supports DALL-E 2 and gpt-image-1) + Returns: + (files_dict, data_dict): files dictionary (for multipart/form-data) and data dictionary + """ + if not isinstance(user_input, dict): + raise ValueError("Image editing requires dict input with 'image_path' and 'prompt'") + + image_path = user_input.get("image_path") or user_input.get("image") or user_input.get("input_image") + if not image_path: + raise ValueError("Image editing requires 'image_path' in input") + + prompt = user_input.get("prompt") or user_input.get("text_prompt", "") + if not prompt: + raise ValueError("Image editing requires 'prompt' in input") + + try: + with open(image_path, "rb") as f: + image_data = f.read() + except Exception as e: + raise ValueError(f"Failed to read image file {image_path}: {e}") + + files = { + "image": (os.path.basename(image_path), image_data, "image/png") + } + + data = { + "prompt": prompt, + "n": 1, + } + + # If there is mask (optional), add mask + mask_path = user_input.get("mask") or user_input.get("mask_path") + if mask_path: + try: + with open(mask_path, "rb") as f: + mask_data = f.read() + files["mask"] = (os.path.basename(mask_path), mask_data, "image/png") + except Exception as e: + self.logger.warning(f"Failed to read mask file {mask_path}: {e}") + + if self.model_name and "dall-e-2" in self.model_name.lower(): + data["size"] = user_input.get("size", "1024x1024") + else: + data["size"] = user_input.get("size", "1024x1024") + + # For gpt-image-1, can add quality parameter + if self.model_name and "gpt-image" in self.model_name.lower(): + quality = user_input.get("quality", "standard") + data["quality"] = quality + + return files, data + + def _build_openai_variation_request(self, user_input: Any) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Build OpenAI image variation request (only supports DALL-E 2) + Returns: + (files_dict, data_dict): files dictionary (for multipart/form-data) and data dictionary + """ + if not isinstance(user_input, dict): + raise ValueError("Image variation requires dict input with 'image_path'") + + image_path = user_input.get("image_path") or user_input.get("image") or user_input.get("input_image") + if not image_path: + raise ValueError("Image variation requires 'image_path' in input") + + try: + with open(image_path, "rb") as f: + image_data = f.read() + except Exception as e: + raise ValueError(f"Failed to read image file {image_path}: {e}") + + files = { + "image": (os.path.basename(image_path), image_data, "image/png") + } + + data = { + "n": 1, + "size": user_input.get("size", "1024x1024"), + } + + return files, data + + def _call_openai_edit_api(self, files: Dict[str, Any], data: Dict[str, Any]) -> Dict[str, Any]: + """Call OpenAI image editing API (/images/edits)""" + endpoint = f"{self.api_url}/v1/images/edits" + headers = { + "Authorization": f"Bearer {self.api_key}" + } + + timeout_tuple = (self.connect_timeout, self.timeout) + + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + resp = requests.post( + endpoint, + files=files, + data=data, + headers=headers, + timeout=timeout_tuple + ) + + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + + resp.raise_for_status() + return resp.json() + + except requests.exceptions.HTTPError as e: + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + else: + self.logger.error( + f"HTTP error {resp.status_code} for endpoint: {endpoint}\n" + f"Response: {resp.text[:500] if hasattr(resp, 'text') else 'N/A'}" + ) + raise + except Exception as e: + self.logger.error(f"Error calling OpenAI edit API: {e}") + raise + + def _call_openai_variation_api(self, files: Dict[str, Any], data: Dict[str, Any]) -> Dict[str, Any]: + """Call OpenAI image variation API (/images/variations)""" + endpoint = f"{self.api_url}/v1/images/variations" + headers = { + "Authorization": f"Bearer {self.api_key}" + } + + timeout_tuple = (self.connect_timeout, self.timeout) + + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + resp = requests.post( + endpoint, + files=files, + data=data, + headers=headers, + timeout=timeout_tuple + ) + + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + + resp.raise_for_status() + return resp.json() + + except requests.exceptions.HTTPError as e: + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + else: + self.logger.error( + f"HTTP error {resp.status_code} for endpoint: {endpoint}\n" + f"Response: {resp.text[:500] if hasattr(resp, 'text') else 'N/A'}" + ) + raise + except Exception as e: + self.logger.error(f"Error calling OpenAI variation API: {e}") + raise + + def _build_gemini_prompt(self, user_input: Any) -> str: + """Extract prompt text from user_input""" + if isinstance(user_input, dict): + prompt = user_input.get("text_prompt", user_input.get("prompt", "")) + else: + prompt = str(user_input) + return prompt + + def _load_image_for_gemini(self, image_path: str) -> Image.Image: + """Load image file, return PIL Image object (Gemini API can directly accept PIL Image)""" + try: + image = Image.open(image_path) + if image.mode != "RGB": + image = image.convert("RGB") + return image + except Exception as e: + self.logger.error(f"Failed to load image {image_path}: {e}") + raise + + def _load_images_for_gemini(self, image_paths: List[str]) -> List[Image.Image]: + """Load multiple image files, return PIL Image object list""" + images = [] + for image_path in image_paths: + try: + image = self._load_image_for_gemini(image_path) + images.append(image) + except Exception as e: + self.logger.warning(f"Failed to load image {image_path}: {e}, skipping...") + return images + + def _call_gemini_chat_api( + self, + conversations: List[Dict[str, str]], + image_paths: Optional[List[str]] = None, + aspect_ratio: Optional[str] = None, + resolution: Optional[str] = None + ) -> List[Image.Image]: + """ + Use chat mode for multi-round image editing + Returns: + list of images generated in the last round + """ + from google.genai import types + + config = types.GenerateContentConfig( + response_modalities=['TEXT', 'IMAGE'], + ) + + chat = self.gemini_client.chats.create( + model=self.model_name, + config=config + ) + + all_images = [] + + for turn_idx, turn in enumerate(conversations): + message = turn.get("content", "") + if not message: + continue + + contents = [] + + if turn_idx == 0 and image_paths: + for image_path in image_paths: + if isinstance(image_path, str): + image = self._load_image_for_gemini(image_path) + contents.append(image) + elif isinstance(image_path, Image.Image): + contents.append(image_path) + + contents.append(message) + + if turn_idx > 0 and (aspect_ratio or resolution): + image_config = types.ImageConfig() + if aspect_ratio: + image_config.aspect_ratio = aspect_ratio + if resolution: + image_config.image_size = resolution + + config = types.GenerateContentConfig( + image_config=image_config + ) + response = chat.send_message(contents, config=config) + else: + response = chat.send_message(contents) + + turn_images = [] + for part in response.parts: + if part.inline_data is not None: + inline_data = part.inline_data + if hasattr(inline_data, 'data'): + img_data = inline_data.data + img_bytes = BytesIO(img_data) + image = Image.open(img_bytes) + if image.mode != "RGB": + image = image.convert("RGB") + turn_images.append(image) + + if turn_images: + all_images.extend(turn_images) + + return all_images if all_images else None + + def _call_gemini_api(self, prompt: str, image_paths: Optional[List[str]] = None) -> List[Image.Image]: + """ + Call Gemini API to generate images using google.genai library + """ + contents = [] + + if image_paths and self.image_gen_task == "imageedit": + for image_path in image_paths: + if isinstance(image_path, str): + image = self._load_image_for_gemini(image_path) + contents.append(image) + elif isinstance(image_path, Image.Image): + contents.append(image_path) + + contents.append(prompt) + + response = self.gemini_client.models.generate_content( + model=self.model_name, + contents=contents, + ) + + images = [] + for part in response.parts: + if part.inline_data is not None: + inline_data = part.inline_data + if hasattr(inline_data, 'data'): + img_data = inline_data.data + img_bytes = BytesIO(img_data) + image = Image.open(img_bytes) + if image.mode != "RGB": + image = image.convert("RGB") + images.append(image) + + if images: + return images[0] if len(images) == 1 else images + else: + return None + + def _call_api(self, payload: Any, headers: Optional[Dict[str, str]] = None, endpoint_suffix: str = "generations") -> Dict[str, Any]: + """ + Send API request + """ + if headers is None: + headers = {} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + headers["Content-Type"] = "application/json" + + if self.api_format == "openai": + endpoint = f"{self.api_url}/v1/images/{endpoint_suffix}" + elif self.api_format == "gemini": + raise ValueError("Gemini API should use _call_gemini_api() method, not _call_api()") + else: + raise ValueError(f"Unsupported api_format: {self.api_format}") + + timeout_tuple = (self.connect_timeout, self.timeout) + + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + resp = requests.post( + endpoint, + json=payload if isinstance(payload, (dict, list)) else payload, + headers=headers, + timeout=timeout_tuple + ) + + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + + resp.raise_for_status() + return resp.json() + + except requests.exceptions.HTTPError as e: + if resp.status_code == 503 and attempt < max_retries - 1: + self.logger.warning( + f"503 Service Unavailable (attempt {attempt + 1}/{max_retries}). " + f"Retrying in {retry_delay}s..." + ) + import time + time.sleep(retry_delay * (attempt + 1)) + continue + else: + self.logger.error( + f"HTTP error {resp.status_code} for endpoint: {endpoint}\n" + f"Response: {resp.text[:500] if hasattr(resp, 'text') else 'N/A'}" + ) + raise + + def generate_from_input(self, user_inputs: List[Any]): + """Batch generate images, interface aligned with LocalImageGenServing""" + out_dict = {} + for start in range(0, len(user_inputs), self.batch_size): + batch_inputs = user_inputs[start:start + self.batch_size] + if not batch_inputs: + continue + batch_result = self.generate_from_input_one_batch(batch_inputs) + out_dict.update(batch_result) + return out_dict + + def generate_from_input_one_batch(self, user_inputs: List[Any]) -> Dict[str, List[Image.Image]]: + """ + Process a batch of inputs, return dictionary in {key: [Image, ...]} format. + + user_inputs format is aligned with LocalImageGenServing: + - text2image: List[dict], each dict contains "text_prompt" and "sample_id" + - imageedit: List[dict], each dict contains "image_path" and "prompt" etc. + """ + images_dict = {} + + try: + for user_input in user_inputs: + try: + if self.api_format == "openai": + if self.image_gen_task == "imageedit": + # Image editing task + try: + use_variations = user_input.get("use_variations", False) if isinstance(user_input, dict) else False + + if use_variations and self.model_name and "dall-e-2" in self.model_name.lower(): + # Use variations endpoint (only supported by DALL-E 2) + files, data = self._build_openai_variation_request(user_input) + response = self._call_openai_variation_api(files, data) + else: + # Use edits endpoint (supported by DALL-E 2 and gpt-image-1) + files, data = self._build_openai_edit_request(user_input) + response = self._call_openai_edit_api(files, data) + + key, img = self._parse_openai_response(response, user_input) + if img is not None: + # Save single image immediately + single_image_dict = {key: [img]} + self.image_io(single_image_dict) + images_dict.setdefault(key, []).append(img) + except Exception as e: + self.logger.error(f"Failed to process image editing request: {str(e)}") + continue + else: + # Image generation task (text2image) + payload = self._build_openai_request(user_input) + response = self._call_api(payload) + key, img = self._parse_openai_response(response, user_input) + if img is not None: + # Save single image immediately + single_image_dict = {key: [img]} + self.image_io(single_image_dict) + images_dict.setdefault(key, []).append(img) + + elif self.api_format == "gemini": + # If image editing task, check if using multi-turn chat mode + image_paths = None + conversations = None + use_chat_mode = False + prompt = None + + if self.image_gen_task == "imageedit": + if isinstance(user_input, dict): + # Check if there is conversation history (multi-turn editing) + conversations = user_input.get("conversations") + if conversations and isinstance(conversations, list) and len(conversations) > 1: + # Use chat mode for multi-turn editing + use_chat_mode = True + else: + # Single-turn mode: extract prompt + prompt = self._build_gemini_prompt(user_input) + + # Extract image paths (support multiple images) + image_path = user_input.get("image_path") or user_input.get("image") or user_input.get("input_image") or user_input.get("images") + + # Process image paths: maybe single path or path list + if image_path: + if isinstance(image_path, list): + image_paths = [path for path in image_path if isinstance(path, str)] + elif isinstance(image_path, str): + image_paths = [image_path] + else: + image_paths = [] + + if not image_paths and not use_chat_mode: + continue + elif not use_chat_mode: + continue + else: + continue + else: + # Text-to-image generation task + prompt = self._build_gemini_prompt(user_input) + + # Choose calling method based on whether there are multiple conversation turns + if use_chat_mode: + # Multi-turn chat mode + aspect_ratio = user_input.get("aspect_ratio") + resolution = user_input.get("resolution") + img_result = self._call_gemini_chat_api( + conversations=conversations, + image_paths=image_paths, + aspect_ratio=aspect_ratio, + resolution=resolution + ) + else: + # Single-turn mode + img_result = self._call_gemini_api(prompt, image_paths=image_paths) + + if img_result is not None: + if isinstance(user_input, dict): + if self.image_gen_task == "imageedit": + idx = user_input.get("idx") + if idx is not None: + key = f"sample_{idx}" + else: + key = str(user_input.get("prompt", "default")) + else: + key = str(user_input.get("sample_id", user_input.get("text_prompt", "default"))) + else: + key = str(user_input) + + # Process returned images + if isinstance(img_result, list): + for img_idx, img in enumerate(img_result): + img_key = f"{key}_{img_idx}" if len(img_result) > 1 else key + single_image_dict = {img_key: [img]} + self.image_io(single_image_dict) + images_dict.setdefault(key, []).append(img) + else: + single_image_dict = {key: [img_result]} + self.image_io(single_image_dict) + images_dict.setdefault(key, []).append(img_result) + + else: + raise ValueError(f"Unsupported api_format: {self.api_format}. Only 'openai' and 'gemini' are supported.") + + except requests.exceptions.Timeout as e: + self.logger.error( + f"Request timeout for input. Connect timeout: {self.connect_timeout}s, " + f"Read timeout: {self.timeout}s. Error: {str(e)}" + ) + self.logger.warning( + "If you're behind a firewall or proxy, you may need to configure it. " + "Alternatively, try using --serving_type local for local model." + ) + continue + except requests.exceptions.ConnectionError as e: + self.logger.error( + f"Connection error: Cannot connect to {self.api_url}. " + f"Please check your network connection or API endpoint. Error: {str(e)}" + ) + continue + except requests.exceptions.HTTPError as e: + if hasattr(e.response, 'status_code'): + if e.response.status_code == 503: + self.logger.error( + f"503 Service Unavailable: The API server is temporarily unavailable. " + f"Please check if the service is running or try again later. " + f"Endpoint: {getattr(e, 'request', {}).get('url', 'N/A')}" + ) + else: + self.logger.error( + f"HTTP {e.response.status_code} error: {str(e)}. " + f"Response: {e.response.text[:200] if hasattr(e.response, 'text') else 'N/A'}" + ) + else: + self.logger.error(f"HTTP error: {str(e)}") + continue + except Exception as e: + self.logger.error(f"Error processing single input: {str(e)}") + continue + + # Save images using image_io, keep the same return value as LocalImageGenServing + return self.image_io(images_dict) + + except Exception as e: + self.logger.error(f"Error in generate_from_input_one_batch: {str(e)}") + # Return empty dictionary to avoid pipeline crash + return {} + + def cleanup(self): + self.logger.info("APIImageGenServing cleanup completed") \ No newline at end of file diff --git a/dataflow/statics/pipelines/api_pipelines/image_editing_api_pipeline.py b/dataflow/statics/pipelines/api_pipelines/image_editing_api_pipeline.py new file mode 100644 index 0000000..ea6a99f --- /dev/null +++ b/dataflow/statics/pipelines/api_pipelines/image_editing_api_pipeline.py @@ -0,0 +1,124 @@ +import os +import argparse +from pathlib import Path +from dataflow.operators.core_vision import PromptedImageEditGenerator +from dataflow.serving.api_image_gen_serving import APIImageGenServing +from dataflow.utils.storage import FileStorage +from dataflow.io import ImageIO + + +class ImageEditingAPIPipeline(): + """ + Image Editing API Pipeline + Supported Models: + OpenAI format (api_format="openai"): dall-e-2, gpt-image-1 + Gemini format (api_format="gemini"): gemini-2.5-flash-image, gemini-3-pro-image-preview, etc. + """ + def __init__( + self, + api_format="gemini", + model_name="gemini-3-pro-image-preview", + batch_size=4, + first_entry_file_name=None, + cache_path="./cache_local/image_edit_api", + ): + current_file = Path(__file__).resolve() + project_root = current_file.parent.parent.parent.parent.parent + + if first_entry_file_name is None: + data_file = project_root / "dataflow" / "example" / "image_gen" / "image_edit" / "prompts_api.jsonl" + first_entry_file_name = str(data_file) + + self.storage = FileStorage( + first_entry_file_name=first_entry_file_name, + cache_path=cache_path, + file_name_prefix="dataflow_cache_step", + cache_type="jsonl" + ) + + if api_format not in ["gemini", "openai"]: + raise ValueError(f"Unsupported api_format: {api_format}. Only 'gemini' and 'openai' are supported for image editing.") + + api_key = os.environ.get("DF_API_KEY") + api_url = os.environ.get("DF_BASE_URL") + + if api_key is None: + raise ValueError("API key is required. Please set it via environment variable DF_API_KEY") + + if api_url is None: + if api_format == "gemini": + api_url = "https://generativelanguage.googleapis.com" + else: # openai + api_url = "https://api.openai.com/v1" + + image_save_path = str(project_root / "cache_local" / "image_edit_api") + + self.serving = APIImageGenServing( + api_url=api_url, + image_io=ImageIO(save_path=image_save_path), + Image_gen_task="imageedit", + batch_size=batch_size, + api_format=api_format, + model_name=model_name, + api_key=api_key, + ) + + self.image_edit_generator = PromptedImageEditGenerator( + image_edit_serving=self.serving, + save_interval=10 + ) + + def forward(self): + self.image_edit_generator.run( + storage=self.storage.step(), + input_image_key="images", + input_conversation_key="conversations", + output_image_key="output_image", + ) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Cloud API Image Editing Pipeline") + parser.add_argument( + '--api_format', + choices=['gemini', 'openai'], + default='gemini', + help='API format type: gemini (Google Gemini) or openai (OpenAI DALL-E 2 / gpt-image-1)' + ) + parser.add_argument( + '--model_name', + type=str, + default='gemini-3-pro-image-preview', + help='Model name: for gemini format use "gemini-2.5-flash-image" or "gemini-3-pro-image-preview"; for openai format use "dall-e-2" or "gpt-image-1"' + ) + parser.add_argument( + '--batch_size', + type=int, + default=4, + help='Batch size' + ) + parser.add_argument( + '--first_entry_file_name', + type=str, + default=None, + help='Input data file path (default uses example_data)' + ) + parser.add_argument( + '--cache_path', + type=str, + default="./cache_local/image_edit_api", + help='Cache path' + ) + args = parser.parse_args() + + if not os.environ.get("DF_API_KEY"): + parser.error("Environment variable DF_API_KEY is not set. Please use export DF_API_KEY=your_api_key to set it") + + model = ImageEditingAPIPipeline( + api_format=args.api_format, + model_name=args.model_name, + batch_size=args.batch_size, + first_entry_file_name=args.first_entry_file_name, + cache_path=args.cache_path, + ) + model.forward() + diff --git a/dataflow/statics/pipelines/api_pipelines/text_to_image_generation_api_pipeline.py b/dataflow/statics/pipelines/api_pipelines/text_to_image_generation_api_pipeline.py new file mode 100644 index 0000000..b39447d --- /dev/null +++ b/dataflow/statics/pipelines/api_pipelines/text_to_image_generation_api_pipeline.py @@ -0,0 +1,127 @@ +import os +import argparse +from pathlib import Path +from dataflow.operators.core_vision import PromptedImageGenerator +from dataflow.serving.api_image_gen_serving import APIImageGenServing +from dataflow.utils.storage import FileStorage +from dataflow.io import ImageIO +from dataflow import get_logger + + +class ImageGenerationAPIPipeline(): + """ + Text to Image Generation API Pipeline + Supported Models: + OpenAI format (api_format="openai"): dall-e-2, dall-e-3, gpt-image-1 + Gemini format (api_format="gemini"): gemini-2.5-flash-image, gemini-3-pro-image-preview, etc. + """ + def __init__( + self, + api_format="gemini", + model_name="gemini-3-pro-image-preview", + batch_size=4, + first_entry_file_name=None, + cache_path="./cache_local/text2image_api", + ): + current_file = Path(__file__).resolve() + project_root = current_file.parent.parent.parent.parent.parent + + if first_entry_file_name is None: + data_file = project_root / "dataflow" / "example" / "image_gen" / "text2image" / "prompts.jsonl" + first_entry_file_name = str(data_file) + + self.storage = FileStorage( + first_entry_file_name=first_entry_file_name, + cache_path=cache_path, + file_name_prefix="dataflow_cache_step", + cache_type="jsonl" + ) + + self.logger = get_logger() + + api_key = os.environ.get("DF_API_KEY") + api_url = os.environ.get("DF_BASE_URL") + + if api_key is None: + raise ValueError("API key is required. Please set it via environment variable DF_API_KEY") + + if api_url is None: + if api_format == "gemini": + api_url = "https://generativelanguage.googleapis.com" + else: + api_url = "https://api.openai.com/v1" + + image_save_path = str(project_root / "cache_local" / "text2image_api") + + self.serving = APIImageGenServing( + api_url=api_url, + image_io=ImageIO(save_path=image_save_path), + Image_gen_task="text2image", + batch_size=batch_size, + api_format=api_format, + model_name=model_name, + api_key=api_key, + ) + + self.text_to_image_generator = PromptedImageGenerator( + t2i_serving=self.serving, + save_interval=10 + ) + + def forward(self): + try: + self.text_to_image_generator.run( + storage=self.storage.step(), + input_conversation_key="conversations", + output_image_key="images", + ) + except Exception as e: + self.logger.error(f"Pipeline execution failed: {str(e)}") + raise + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Cloud API Image Generation Pipeline") + parser.add_argument( + '--api_format', + choices=['openai', 'gemini'], + default='gemini', + help='API format type: openai (OpenAI DALL-E) or gemini (Google Gemini)' + ) + parser.add_argument( + '--model_name', + type=str, + default='gemini-3-pro-image-preview', + help='Model name: for openai format use "dall-e-2", "dall-e-3", "gpt-image-1"; for gemini format use "gemini-2.5-flash-image" or "gemini-3-pro-image-preview", etc.' + ) + parser.add_argument( + '--batch_size', + type=int, + default=4, + help='Batch size' + ) + parser.add_argument( + '--first_entry_file_name', + type=str, + default=None, + help='Input data file path (default uses example_data)' + ) + parser.add_argument( + '--cache_path', + type=str, + default="./cache_local/text2image_api", + help='Cache path' + ) + args = parser.parse_args() + + if not os.environ.get("DF_API_KEY"): + parser.error("Environment variable DF_API_KEY is not set. Please use export DF_API_KEY=your_api_key to set it") + + model = ImageGenerationAPIPipeline( + api_format=args.api_format, + model_name=args.model_name, + batch_size=args.batch_size, + first_entry_file_name=args.first_entry_file_name, + cache_path=args.cache_path, + ) + model.forward() + diff --git a/dataflow/statics/pipelines/gpu_pipelines/image_editing_pipeline.py b/dataflow/statics/pipelines/gpu_pipelines/image_editing_pipeline.py index 0ad6fd9..4c7d35a 100644 --- a/dataflow/statics/pipelines/gpu_pipelines/image_editing_pipeline.py +++ b/dataflow/statics/pipelines/gpu_pipelines/image_editing_pipeline.py @@ -1,40 +1,38 @@ import os -import argparse +from pathlib import Path from dataflow.operators.core_vision import PromptedImageEditGenerator from dataflow.serving.local_image_gen_serving import LocalImageGenServing -from dataflow.serving.api_vlm_serving_openai import APIVLMServing_openai from dataflow.utils.storage import FileStorage from dataflow.io import ImageIO class ImageGenerationPipeline(): - def __init__(self, serving_type="local", api_url="http://123.129.219.111:3000/v1/"): + def __init__(self): + current_file = Path(__file__).resolve() + project_root = current_file.parent.parent.parent.parent.parent + + data_file = project_root / "dataflow" / "example" / "image_gen" / "image_edit" / "prompts_local.jsonl" + self.storage = FileStorage( - first_entry_file_name="../example_data/image_gen/image_edit/prompts.jsonl", - cache_path="./cache_local/image_editing", + first_entry_file_name=str(data_file), + cache_path="./cache_local/image_edit_local", file_name_prefix="dataflow_cache_step", cache_type="jsonl" ) - if serving_type == "local": - self.serving = LocalImageGenServing( - image_io=ImageIO(save_path=os.path.join(self.storage.cache_path, "target_images")), - hf_model_name_or_path="black-forest-labs/FLUX.1-Kontext-dev", # "black-forest-labs/FLUX.1-Kontext-dev" - hf_cache_dir="./cache_local", - hf_local_dir="./ckpt/models/", - Image_gen_task="imageedit", - batch_size=4, - diffuser_model_name="FLUX-Kontext", - diffuser_num_inference_steps=28, - diffuser_guidance_scale=3.5, - ) - elif serving_type == "api": - self.serving = APIVLMServing_openai( - api_url=api_url, - model_name="gemini-2.5-flash-image-preview", # try nano-banana - image_io=ImageIO(save_path=os.path.join(self.storage.cache_path, "target_images")), - # send_request_stream=True, # if use ip http://123.129.219.111:3000/ add this line - ) + image_save_path = str(project_root / "cache_local" / "image_edit_local") + + self.serving = LocalImageGenServing( + image_io=ImageIO(save_path=image_save_path), + hf_model_name_or_path="/mnt/DataFlow/hzy/lqh/models/FLUX.1-Kontext-dev", + hf_cache_dir="./cache_local", + hf_local_dir="./ckpt/models/", + Image_gen_task="imageedit", + batch_size=4, + diffuser_model_name="FLUX-Kontext", + diffuser_num_inference_steps=28, + diffuser_guidance_scale=3.5, + ) self.text_to_image_generator = PromptedImageEditGenerator( image_edit_serving=self.serving, @@ -51,15 +49,5 @@ def forward(self): if __name__ == "__main__": # This is the entry point for the pipeline - parser = argparse.ArgumentParser() - parser.add_argument( - '--serving_type', - choices=['local', 'api'], - default='api', - ) - parser.add_argument( - '--api_url', type=str, default='http://123.129.219.111:3000/v1/', - ) - args = parser.parse_args() - model = ImageGenerationPipeline(serving_type=args.serving_type, api_url=args.api_url) + model = ImageGenerationPipeline() model.forward() diff --git a/dataflow/statics/pipelines/gpu_pipelines/text_to_image_generation_pipeline.py b/dataflow/statics/pipelines/gpu_pipelines/text_to_image_generation_pipeline.py index 56a5f50..a7fc29d 100644 --- a/dataflow/statics/pipelines/gpu_pipelines/text_to_image_generation_pipeline.py +++ b/dataflow/statics/pipelines/gpu_pipelines/text_to_image_generation_pipeline.py @@ -7,19 +7,31 @@ class ImageGenerationPipeline(): def __init__(self): + # 使用绝对路径确保能找到文件 + from pathlib import Path + current_file = Path(__file__).resolve() + project_root = current_file.parent.parent.parent.parent.parent + prompts_file = project_root / "dataflow" / "example" / "image_gen" / "text2image" / "prompts.jsonl" + prompts_file = str(prompts_file) + self.storage = FileStorage( - first_entry_file_name="../example_data/image_gen/text2image/prompts.jsonl", - cache_path="./cache_local/text_to_image_generation", + first_entry_file_name=prompts_file, + cache_path="./cache_local/text2image_local", file_name_prefix="dataflow_cache_step", cache_type="jsonl" ) + image_save_path = str(project_root / "cache_local" / "text2image_local") + self.serving = LocalImageGenServing( - image_io=ImageIO(save_path=os.path.join(self.storage.cache_path, "condition_images")), + image_io=ImageIO(save_path=image_save_path), batch_size=4, - hf_model_name_or_path="/ytech_m2v5_hdd/CheckPoints/FLUX.1-dev", # "black-forest-labs/FLUX.1-dev" + hf_model_name_or_path="/mnt/DataFlow/hzy/lqh/models/FLUX.1-dev", # "black-forest-labs/FLUX.1-dev" hf_cache_dir="./cache_local", - hf_local_dir="./ckpt/models/" + hf_local_dir="./ckpt/models/", + diffuser_num_inference_steps=20, # 减少推理步数从50到20,加快速度 + diffuser_image_height=512, # 降低分辨率可以加快速度 + diffuser_image_width=512, ) self.text_to_image_generator = PromptedImageGenerator( diff --git a/dataflow/utils/storage.py b/dataflow/utils/storage.py index 6efd729..48fb770 100644 --- a/dataflow/utils/storage.py +++ b/dataflow/utils/storage.py @@ -109,7 +109,28 @@ def _load_local_file(self, file_path: str, file_type: str) -> pd.DataFrame: if file_type == "json": return pd.read_json(file_path) elif file_type == "jsonl": - return pd.read_json(file_path, lines=True) + import json + records = [] + with open(file_path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if line: + try: + record = json.loads(line) + records.append(record) + except json.JSONDecodeError as e: + import warnings + warnings.warn( + f"Skipping invalid JSON at line {line_num} in {file_path}: {e}", + UserWarning + ) + continue + + if not records: + raise ValueError(f"No valid JSON records found in {file_path}") + + df = pd.DataFrame(records) + return df elif file_type == "csv": return pd.read_csv(file_path) elif file_type == "parquet": @@ -148,11 +169,9 @@ def read(self, output_type: Literal["dataframe", "dict"]="dataframe") -> Any: ValueError: For unsupported file types or output types """ file_path = self._get_cache_file_path(self.operator_step) - self.logger.info(f"Reading data from {file_path} with type {output_type}") if self.operator_step == 0: source = self.first_entry_file_name - self.logger.info(f"Reading remote dataset from {source} with type {output_type}") if source.startswith("hf:"): from datasets import load_dataset _, dataset_name, *parts = source.split(":") @@ -206,7 +225,6 @@ def write(self, data: Any) -> Any: file_path = self._get_cache_file_path(self.operator_step + 1) os.makedirs(os.path.dirname(file_path), exist_ok=True) - self.logger.success(f"Writing data to {file_path} with type {self.cache_type}") if self.cache_type == "json": dataframe.to_json(file_path, orient="records", force_ascii=False, indent=2) elif self.cache_type == "jsonl":