Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## 项目概述

tdx2db:从本地通达信(TDX)行情软件读取 A 股数据,增量同步到数据库。是量化分析工作站的数据入口。

## 常用命令

```bash
# 安装依赖
pip install -r requirements.txt

# 一键增量同步(日线 + 5/15/30/60 分钟线)— 日常使用这一个命令即可
python main.py sync

# 单独同步
python main.py daily --db-only --auto-start --incremental
python main.py minutes --db-only --auto-start --incremental

# 同步股票列表
python main.py stock-list --db-only
```

无测试套件。验证方式是运行 `sync` 命令后检查数据库数据。

## 架构

四层管道,单向数据流:

```
CLI (cli.py) → Reader (reader.py) → Processor (processor.py) → Storage (storage.py)
↓ ↓ ↓ ↓
argparse pytdx 读取本地 校验 + 重采样 + 均线 SQLAlchemy 批量写库
命令分发 + .day/.lc5 文件 (OHLCV 校验, resample, 支持增量 ON CONFLICT
同步编排 MA5~MA250) 表名白名单保护
```

- **cli.py**: 除命令分发外,`sync_all_daily_data` / `sync_all_min_data` / `sync_single_stock_min_data` 编排逐股票流式同步
- **config.py**: 全局单例 `config`,从 `.env` 加载配置(TDX_PATH、DB_*)
- **logger.py**: 全局单例 `logger`

### 关键数据流

日线和分钟线均为**逐股票流式处理**,不全量加载到内存:

1. **日线**: 逐股票读取 `vipdoc/{sz,sh}/lday/*.day` → `process_daily_data()` 校验 OHLCV + 计算均线 → 增量写入 `daily_data` 表
2. **分钟线**: 逐股票读取 `.lc5`(5 分钟)→ `resample_ohlcv()` 重采样为 15/30/60 分钟 → `process_min_data()` 校验 + 均线 → 分别写入 `minute{5,15,30,60}_data` 表
3. **增量同步**: `save_incremental()` 使用批量 executemany + `ON CONFLICT DO NOTHING`(PostgreSQL)/ `INSERT IGNORE`(MySQL)跳过重复。分钟线按股票精确查询最新日期(`get_latest_datetime_by_code`),日线逐股票增量。

### 数据库表

| 表名 | 唯一约束 | 用途 |
|------|----------|------|
| `daily_data` | (code, date) | 日线数据 |
| `minute{5,15,30,60}_data` | (code, datetime) | 分钟线数据 |
| `stock_info` | code | 股票列表 |
| `block_stock_relation` | — | 板块关系(未完整实现) |

唯一约束需通过 `scripts/add_constraints.sql` 手动添加。

### 股票代码格式

代码带市场前缀:`sz000001`、`sh600000`。深圳 market=0,上海 market=1。
A 股筛选规则:深圳 `000/001/002/300` 开头,上海 `60/688` 开头。

## 配置

通过 `.env` 文件配置,必填:`TDX_PATH`、`DB_TYPE`、`DB_HOST`、`DB_NAME`、`DB_USER`、`DB_PASSWORD`。
249 changes: 175 additions & 74 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,174 @@
import argparse
import sys
from argparse import Namespace
from typing import Optional

from datetime import timedelta

import pandas as pd
from tqdm import tqdm

from .reader import TdxDataReader
from .processor import DataProcessor
from .storage import DataStorage
from .config import config
from .logger import logger


def sync_single_stock_min_data(
reader: TdxDataReader,
processor: DataProcessor,
storage: DataStorage,
market: int,
code: str,
start_date: Optional[str] = None,
incremental: bool = True,
) -> bool:
"""处理并存储单只股票的分钟数据

Args:
reader: 数据读取器
processor: 数据处理器
storage: 数据存储器
market: 市场代码
code: 股票代码
start_date: 开始日期
incremental: 是否启用精确增量
"""
# 精确增量:查询该股票的最新日期
if incremental and not start_date:
latest = storage.get_latest_datetime_by_code('minute5_data', code)
if latest:
start_date = (latest + timedelta(days=1)).strftime('%Y-%m-%d')
logger.debug(f"{code} 增量起始日期: {start_date}")

# 读取5分钟数据
df_5min = reader.read_5min_data(market, code)
if df_5min.empty:
logger.warning(f"{code} 无5分钟数据")
return False

# 准备 datetime 索引
if not pd.api.types.is_datetime64_any_dtype(df_5min['datetime']):
df_5min['datetime'] = pd.to_datetime(df_5min['datetime'])
df_5min['date'] = df_5min['datetime'].dt.date
df_5min = df_5min.set_index('datetime')

# 重采样为多周期
df_15min = DataProcessor.resample_ohlcv(df_5min, '15min')
df_30min = DataProcessor.resample_ohlcv(df_5min, '30min')
df_60min = DataProcessor.resample_ohlcv(df_5min, '60min')
df_5min = df_5min.reset_index()

# 处理、筛选、存储各周期
freq_data = [
(df_5min, 5, 'minute5_data'),
(df_15min, 15, 'minute15_data'),
(df_30min, 30, 'minute30_data'),
(df_60min, 60, 'minute60_data'),
]

has_data = False
for df, freq, table_name in freq_data:
processed = processor.process_min_data(df)
if start_date:
processed = processor.filter_data_min(processed, start_date=start_date)
if processed.empty:
continue
has_data = True
if incremental:
storage.save_incremental(processed, table_name)
else:
storage.save_minute_data(processed, freq=freq, to_csv=False, to_db=True)

if has_data:
logger.info(f"{code} 分钟数据已处理并存入数据库")
else:
logger.debug(f"{code} 无新数据需要同步")

return True


def sync_all_daily_data(
reader: TdxDataReader,
processor: DataProcessor,
storage: DataStorage,
start_date: Optional[str] = None,
) -> bool:
"""逐股票流式同步日线数据,避免全量加载到内存"""
try:
stocks = reader.get_stock_list()
logger.info(f"同步日线数据,共 {len(stocks)} 只股票")

iterator = tqdm(stocks.iterrows(), total=len(stocks)) if config.use_tqdm else stocks.iterrows()
total_inserted = 0

for _, stock in iterator:
code = stock['code']
market = 1 if code.startswith('sh') else 0
try:
data = reader.read_daily_data(market, code)
if isinstance(data.index, pd.DatetimeIndex) or data.index.name == 'datetime':
data = data.reset_index()
if data.empty:
continue

processed = processor.process_daily_data(data)
filtered = processor.filter_data(processed, start_date=start_date)
if filtered.empty:
continue

inserted = storage.save_incremental(
filtered, 'daily_data',
conflict_columns=('code', 'date'),
batch_size=config.db_batch_size
)
total_inserted += inserted
except FileNotFoundError:
continue
except Exception as e:
logger.error(f"同步 {code} 日线数据时出错: {e}")
continue

if total_inserted > 0:
logger.info(f"日线数据同步完成,共插入 {total_inserted} 条")
else:
logger.info("日线数据已是最新")
return True
except Exception as e:
logger.error(f"同步日线数据时出错: {e}")
return False


def sync_all_min_data(
reader: TdxDataReader,
processor: DataProcessor,
storage: DataStorage,
start_date: Optional[str] = None,
) -> bool:
"""编排所有股票的分钟数据同步"""
try:
stocks = reader.get_stock_list()
logger.info(f"处理所有股票的分钟数据,共 {len(stocks)} 只股票")

iterator = tqdm(stocks.iterrows(), total=len(stocks)) if config.use_tqdm else stocks.iterrows()

for _, stock in iterator:
code = stock['code']
market = 1 if code.startswith('sh') else 0
try:
sync_single_stock_min_data(reader, processor, storage, market, code, start_date)
except FileNotFoundError:
continue
except Exception as e:
logger.error(f"处理 {code} 分钟数据时出错: {e}")
continue

return True
except Exception as e:
logger.error(f"处理分钟数据时出错: {e}")
return False

def parse_args() -> Namespace:
"""解析命令行参数

Expand Down Expand Up @@ -228,64 +387,22 @@ def main() -> int:

# 获取分钟线数据
if args.code and args.market is not None:
# 获取单只股票的分钟线数据
data_list = reader.read_min_data(args.market, args.code)

logger.info(f"获取到 {len(data_list)} 种分钟线数据记录")
# 检查数据

if data_list[0].empty:
logger.warning("未获取到任何数据")
return 0

# [data_15min, data_30min, data_60min]
logger.info(f"生成了 {len(data_list[0])} 条15分钟线数据记录")
logger.info(f"生成了 {len(data_list[1])} 条30分钟线数据记录")
logger.info(f"生成了 {len(data_list[2])} 条60分钟线数据记录")

# 处理数据
# 单只股票:统一走 sync_single_stock_min_data,覆盖 5/15/30/60 全部周期
processor = DataProcessor()
processed_data_list = []
for i, data in enumerate(data_list):
freq = [15, 30, 60][i] # 对应的分钟频率
processed_data = processor.process_min_data(data)

# 根据日期筛选
filtered_data = processor.filter_data(
processed_data,
start_date=start_date,
end_date=args.end_date
)

if not filtered_data.empty:
processed_data_list.append((filtered_data, freq))
logger.info(f"筛选后有 {len(filtered_data)} 条 {freq} 分钟线数据记录")
else:
logger.warning(f"筛选后 {freq} 分钟线没有数据")

if not processed_data_list:
logger.warning("筛选后所有周期都没有数据")
success = sync_single_stock_min_data(
reader, processor, storage,
args.market, args.code,
start_date=start_date,
incremental=incremental,
)
if not success:
logger.warning(f"股票 {args.code} 无数据可同步")
return 0

# 确定保存方式
to_csv = not args.db_only
to_db = not args.csv_only

# 保存数据
for filtered_data, freq in processed_data_list:
table_name = f'minute{freq}_data'
if to_csv:
storage.save_to_csv(filtered_data, table_name)
if to_db:
if incremental:
storage.save_incremental(filtered_data, table_name, batch_size=config.db_batch_size)
else:
storage.save_to_database(filtered_data, table_name, batch_size=config.db_batch_size)
else:
# 获取所有股票的分钟线数据
logger.info("开始处理所有股票的分钟线数据...")
processor = DataProcessor()
success = reader.process_and_store_min_data(storage, processor, start_date)
success = sync_all_min_data(reader, processor, storage, start_date)
if success:
logger.info("所有股票的分钟线数据处理完成")
else:
Expand Down Expand Up @@ -328,34 +445,18 @@ def main() -> int:
start_date = (latest + timedelta(days=1)).strftime('%Y-%m-%d')
logger.info(f"日线起始日期: {start_date}")

data = reader.read_all_daily_data()
if not data.empty:
processed_data = processor.process_daily_data(data)
filtered_data = processor.filter_data(processed_data, start_date=start_date)
if not filtered_data.empty:
storage.save_incremental(
filtered_data, 'daily_data',
conflict_columns=('code', 'date'),
batch_size=config.db_batch_size
)
else:
logger.info("日线数据已是最新")
else:
logger.warning("未获取到日线数据")
success = sync_all_daily_data(reader, processor, storage, start_date)
if not success:
logger.error("同步日线数据时出错")
has_error = True
except Exception as e:
logger.error(f"同步日线数据时出错: {e}")
has_error = True

# 2. 同步分钟线数据
# 2. 同步分钟线数据(逐股票精确增量,不传全局 start_date)
try:
logger.info("=== 同步分钟线数据 ===")
latest = storage.get_latest_datetime('minute15_data')
start_date = None
if latest:
start_date = (latest + timedelta(days=1)).strftime('%Y-%m-%d')
logger.info(f"分钟线起始日期: {start_date}")

success = reader.process_and_store_min_data(storage, processor, start_date)
success = sync_all_min_data(reader, processor, storage)
if not success:
logger.error("同步分钟线数据时出错")
has_error = True
Expand Down
Loading
Loading