-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_query_users.py
More file actions
122 lines (90 loc) · 3.64 KB
/
Copy pathtest_query_users.py
File metadata and controls
122 lines (90 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
测试 query_users 函数的脚本
"""
import asyncio
from mcp_server import query_users, QueryUsersParams, USERS
from mcp.server.session import ServerSession
from mcp.server.fastmcp import Context
# 创建一个模拟的 Context 对象用于测试
class MockContext:
"""模拟 MCP Context,用于测试"""
async def info(self, message: str):
print(f"ℹ️ INFO: {message}")
async def warning(self, message: str):
print(f"⚠️ WARNING: {message}")
async def error(self, message: str):
print(f"❌ ERROR: {message}")
async def test_query_all_users():
"""测试1: 查询所有用户"""
print("\n" + "="*50)
print("测试1: 查询所有用户")
print("="*50)
ctx = MockContext()
params = QueryUsersParams(name=None, min_age=None, max_age=None, email_contains=None)
result = await query_users(params, ctx)
print(f"状态: {result.status}")
print(f"找到 {result.count} 个用户")
for user in result.users:
print(f" - {user['id']}: {user['name']} ({user['age']}) - {user['email']}")
async def test_query_by_name():
"""测试2: 按姓名查询"""
print("\n" + "="*50)
print("测试2: 按姓名查询 (name='Alice')")
print("="*50)
ctx = MockContext()
params = QueryUsersParams(name="Alice", min_age=None, max_age=None, email_contains=None)
result = await query_users(params, ctx)
print(f"状态: {result.status}")
print(f"找到 {result.count} 个用户")
for user in result.users:
print(f" - {user['name']} ({user['age']}) - {user['email']}")
async def test_query_by_age_range():
"""测试3: 按年龄范围查询"""
print("\n" + "="*50)
print("测试3: 按年龄范围查询 (min_age=27, max_age=30)")
print("="*50)
ctx = MockContext()
params = QueryUsersParams(name=None, min_age=27, max_age=30, email_contains=None)
result = await query_users(params, ctx)
print(f"状态: {result.status}")
print(f"找到 {result.count} 个用户")
for user in result.users:
print(f" - {user['name']} (年龄: {user['age']}) - {user['email']}")
async def test_query_by_email():
"""测试4: 按邮箱查询"""
print("\n" + "="*50)
print("测试4: 按邮箱查询 (email_contains='smith')")
print("="*50)
ctx = MockContext()
params = QueryUsersParams(name=None, min_age=None, max_age=None, email_contains="smith")
result = await query_users(params, ctx)
print(f"状态: {result.status}")
print(f"找到 {result.count} 个用户")
for user in result.users:
print(f" - {user['name']} - {user['email']}")
async def test_query_combined():
"""测试5: 组合条件查询"""
print("\n" + "="*50)
print("测试5: 组合条件查询 (name='David', min_age=25, max_age=30)")
print("="*50)
ctx = MockContext()
params = QueryUsersParams(name="David", min_age=25, max_age=30, email_contains=None)
result = await query_users(params, ctx)
print(f"状态: {result.status}")
print(f"找到 {result.count} 个用户")
for user in result.users:
print(f" - {user['name']} (年龄: {user['age']}) - {user['email']}")
async def main():
"""运行所有测试"""
print("\n🧪 开始测试 query_users 函数\n")
print(f"数据库中共有 {len(USERS)} 个用户\n")
await test_query_all_users()
await test_query_by_name()
await test_query_by_age_range()
await test_query_by_email()
await test_query_combined()
print("\n" + "="*50)
print("✅ 所有测试完成!")
print("="*50 + "\n")
if __name__ == "__main__":
asyncio.run(main())