-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
146 lines (112 loc) · 4.69 KB
/
server.py
File metadata and controls
146 lines (112 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#-*- encoding:utf-8 -*-
import logging
from typing import Optional
from xml.dom.minidom import parseString
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from pydantic import BaseModel, Field
from callbacks import dispatch
from config import load_config
from messager import WeComMessenger
from WXBizMsgCrypt3 import WXBizMsgCrypt # https://github.com/sbzhu/weworkapi_python project URL
config = load_config()
SERVER_HOST = config["server"]["host"]
SERVER_PORT = config["server"]["port"]
CALLBACK_TOKEN = config["callback"]["token"]
CALLBACK_ENCODING_AES_KEY = config["callback"]["encoding_aes_key"]
CALLBACK_CORPID = config["callback"]["corpid"]
SEND_TOKENS = set(config["send"]["tokens"])
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
qy_api = [
WXBizMsgCrypt(
CALLBACK_TOKEN,
CALLBACK_ENCODING_AES_KEY,
CALLBACK_CORPID,
),
]
class SendMessageRequest(BaseModel):
to_user: str = Field(..., min_length=1)
msg: str = Field(..., min_length=1)
token: str = Field(..., min_length=1)
class RecallMessageRequest(BaseModel):
msgid: str = Field(..., min_length=1)
token: str = Field(..., min_length=1)
def verify_url(msg_signature: str, timestamp: str, nonce: str, echo_str: str) -> Optional[str]:
ret, echo = qy_api[0].VerifyURL(msg_signature, timestamp, nonce, echo_str)
if ret != 0:
logger.error("ERR: VerifyURL ret: %s", ret)
return None
return echo
def decrypt_message(msg_signature: str, timestamp: str, nonce: str, data: str) -> Optional[str]:
ret, msg = qy_api[0].DecryptMsg(data, msg_signature, timestamp, nonce)
if ret != 0:
logger.error("ERR: DecryptMsg ret: %s", ret)
return None
return msg
@app.get("/recv")
async def recv_verify(request: Request):
msg_signature = request.query_params.get("msg_signature", "")
timestamp = request.query_params.get("timestamp", "")
nonce = request.query_params.get("nonce", "")
echo_str = request.query_params.get("echostr", "")
echo = verify_url(msg_signature, timestamp, nonce, echo_str)
if echo is None:
return PlainTextResponse("failed", status_code=400)
logger.info("Received GET request, echo_str: %s", echo)
return PlainTextResponse(echo)
@app.post("/recv")
async def recv_message(request: Request):
msg_signature = request.query_params.get("msg_signature", "")
timestamp = request.query_params.get("timestamp", "")
nonce = request.query_params.get("nonce", "")
raw_data = await request.body()
data = raw_data.decode("utf-8")
decrypted = decrypt_message(msg_signature, timestamp, nonce, data)
if decrypted is None:
return PlainTextResponse("failed", status_code=400)
doc = parseString(decrypted)
collection = doc.documentElement
name_xml = collection.getElementsByTagName("FromUserName")
msg_xml = collection.getElementsByTagName("Content")
type_xml = collection.getElementsByTagName("MsgType")
pic_xml = collection.getElementsByTagName("PicUrl")
msg_type = type_xml[0].childNodes[0].data
if msg_type == "text":
name = name_xml[0].childNodes[0].data
msg = msg_xml[0].childNodes[0].data
logger.info("[ch0] %s:%s", name, msg)
dispatch(name, msg_type, msg)
elif msg_type == "image":
name = name_xml[0].childNodes[0].data
pic_url = pic_xml[0].childNodes[0].data
logger.info("[ch0] %s:Image message", name)
dispatch(name, msg_type, pic_url)
return PlainTextResponse("ok")
@app.post("/send")
def send_message(payload: SendMessageRequest):
if payload.token not in SEND_TOKENS:
raise HTTPException(status_code=401, detail="Invalid token")
messenger = WeComMessenger()
result = messenger.send_message(
msgtype="text",
touser=payload.to_user,
content={"content": payload.msg},
safe=0,
)
if result.get("errcode") == 0:
return JSONResponse({"ok": True, "result": result})
raise HTTPException(status_code=502, detail=f"Failed to send message: {result.get('errmsg')}")
@app.post("/recall")
def recall_message(payload: RecallMessageRequest):
if payload.token not in SEND_TOKENS:
raise HTTPException(status_code=401, detail="Invalid token")
messenger = WeComMessenger()
result = messenger.recall_message(msgid=payload.msgid)
if result.get("errcode") == 0:
return JSONResponse({"ok": True, "result": result})
raise HTTPException(status_code=502, detail=f"Failed to recall message: {result.get('errmsg')}")
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host=SERVER_HOST, port=SERVER_PORT, reload=False)