参考文档
https://api-docs.deepseek.com/zh-cn/api/create-chat-completion
https://cloud.tencent.com/document/product/1772/115969
https://blog.csdn.net/qq_26144365/article/details/145469936
实现代码
import http.client
import json
from urllib.parse import urlencode, urlparse
class OpenAi:
def __init__(self, base_url, api_key='sk-xxxxxxxxxxx', model='deepseek-r1', max_tokens=1024, max_messages=2):
self.parsed_url = urlparse(base_url)
self.api_key = api_key
self.model = model
self.max_tokens = max_tokens
self.messagesMap = {}
self.max_messages = max_messages # 最大记录对话轮数
# 根据当前会话id记录聊天信息,当记录的聊天信息轮数超过最大值时自动清理最旧的数据
def setMessages(self, key, messages):
self.messagesMap.setdefault(key, []).extend(messages);
# print(f'\n当前{key}队列长度:{len(self.messagesMap[key])}')
while len(self.messagesMap[key]) > (self.max_messages * 2):
self.messagesMap[key].pop(0)
# print(f'\n处理后{key}队列长度:{len(self.messagesMap[key])}')
# 根据当前会话id获取历史聊天信息
def getMessages(self, key):
return self.messagesMap.setdefault(key, []);
# 对话,question:问题,key:当前会话id用于记录对话,role:自定义人设
def chatAi(self, question, key='default', role='You are a helpful assistant'):
# 创建连接
conn = None
if self.parsed_url.scheme == 'https':
conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port)
else:
conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port)
headers = {
'Content-Type': 'application/json; charset=utf-8',
# 'Accept': 'application/json',
'Authorization': 'Bearer ' + self.api_key
}
messages = [{'role': 'system', 'content': role}]
messages.extend(self.getMessages(key))
messages.append({'role': 'user', 'content': question})
data = {
'model': self.model,
# 'model': 'deepseek-r1:32b',
# 'model': 'deepseek-r1',
# 'model': 'deepseek-v3',
'messages': messages,
'max_tokens': self.max_tokens,
# 'max_tokens': 4096,
# 'max_tokens': 1024,
'temperature': 1.5, # 代码生成/数学解题: 0.0, 数据抽取/分析: 1.0, 通用对话: 1.3, 翻译: 1.3, 创意类写作/诗歌创作: 1.5
'stream': True
}
body = json.dumps(data).encode('utf-8') # 将字典转换为JSON字符串,并编码为UTF-8字节串
conn.request('POST', self.parsed_url.path, body=body, headers=headers)
response = conn.getresponse()
answer_reasoning_content = "" # 定义完整思考过程
answer_content = "" # 定义完整回复
is_answering = False # 判断是否结束思考过程并开始回复
if response.status == 200:
while True:
chunk = response.readline();
if not chunk:
break # 没有更多数据了
data = chunk.decode("utf-8")
# print(data);
if data.startswith("data: {"):
data = json.loads(data[6:])
# 处理usage信息
choices = data.get('choices', None)
if not choices:
print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
print(data['usage'])
continue
delta = choices[0]['delta']
# 处理空内容情况
reasoning_content = delta.get('reasoning_content', None) # 思考过程
content = delta.get('content', None) # 回复内容
if not reasoning_content and not content:
continue
# 处理开始回答的情况
if not reasoning_content and not is_answering:
print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
is_answering = True
# 处理思考过程
if reasoning_content:
print(reasoning_content, end='', flush=True)
answer_reasoning_content += reasoning_content
# 处理回复内容
elif content:
print(content, end='', flush=True)
answer_content += content
conn.close()
self.setMessages(key, [
{'role': 'user', 'content': question},
{'role': 'assistant', 'content': answer_content}
])
return answer_content
else:
print(f'\n请求失败,状态码: {response.status},内容: {response.read().decode("utf-8")}')
conn.close()
return "系统繁忙,请稍后重试!"
# 通过文件路径读取角色
def readRoleByFilePath(self, path):
text = '你是个乐于助人的助手'
try:
with open(path, 'r', encoding='utf-8') as file:
# 读取所有文本
text = file.read()
except Exception as e:
print(f"\n读取角色发生错误:{e}")
return text
if __name__ == "__main__":
try:
ai = OpenAi('https://api.lkeap.cloud.tencent.com/v1/chat/completions', 'sk-xxxxxxxxxxx', max_messages=3)
ai.chatAi('你好,能介绍一下自己吗?', '会话1', ai.readRoleByFilePath('角色.txt'))
except Exception as e:
print(f"\n发生错误:{e}")
调用腾讯云联网deepseek-r1
https://lke.cloud.tencent.com/lke#/app/home
https://cloud.tencent.com/document/product/1759/105561#5d0f7191-c1e6-4968-84c9-7825fb3a8fe1
如何获取 AppKey
在应用管理界面,找到您处于运行中的应用(需要先发布),单击调用,会弹出“调用信息”窗口。
在调用信息窗口可以看到 AppKey,单击复制即可。
curl 调用示例
curl -XPOST -vvv --no-buffer --location 'https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse' \
--header 'Content-Type: application/json' \
--data '{
"content": "消息内容",
"bot_app_key": "<your appkey>",
"visitor_biz_id": "<your visitor id>",
"session_id": "<your session_id>",
"visitor_labels": []
}'
postman 调用示例
实现代码
import http.client
import json
from urllib.parse import urlencode, urlparse
class TencentAi:
def __init__(self, base_url='https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse', bot_app_key='<your appkey>', visitor_biz_id='<your visitor id>', session_id='<your session_id>', visitor_labels=[]):
self.parsed_url = urlparse(base_url)
self.bot_app_key = bot_app_key # 应用密钥
self.visitor_biz_id = visitor_biz_id # 访客 ID(外部输入,建议唯一,标识当前接入会话的用户)
self.session_id = session_id # 会话 ID,用于标识一个会话(外部系统提供,建议不同的用户端会话传入不同的 session_id,否则同一个应用下的不同用户的消息记录会串掉)参数长度:2-64个字符
self.visitor_labels = visitor_labels # 知识标签(用于知识库中知识的检索过滤);知识标签结构为: [{"name": "subject", "values": ["语文", "数学"]}]
def chatAi(self, question, key='default', role='你是个乐于助人的助手'):
# 创建连接
conn = None
if self.parsed_url.scheme == 'https':
conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port)
else:
conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port)
headers = {
'Content-Type': 'application/json; charset=utf-8',
# 'Accept': 'application/json',
}
if len(key) < 2:
print(f'\n参数过短:{key}\n')
data = {
'content': question, # 消息内容,如果发送图片,在此传递 markdown 格式的图片链接,例如,其中图片链接需要可公有读。
# 'request_id': key, # 请求 ID,用于标识一个请求(作消息串联,建议每个请求使用不同的 request_id)
'bot_app_key': self.bot_app_key,
'visitor_biz_id': key,
'session_id': key,
# 'visitor_labels': self.visitor_labels,
'system_role': role
}
body = json.dumps(data).encode('utf-8') # 将字典转换为JSON字符串,并编码为UTF-8字节串
conn.request('POST', self.parsed_url.path, body=body, headers=headers)
response = conn.getresponse()
token_count = 0 # 本轮请求消耗 token 数(当包含多个过程时, 计算将汇总)
references = [] # 参考来源
is_thought = False # 判断是否开始思考过程
thought = "" # 定义完整思考过程
answer_content = "" # 定义完整回复
is_answering = False # 判断是否结束思考过程并开始回复
if response.status == 200:
# 设置一个块大小
while True:
chunk = response.readline();
if not chunk:
break # 没有更多数据了
data = chunk.decode("utf-8")
# print(data);
if data.startswith("data:{"):
data = json.loads(data[5:])
# 处理空类型情况
data_type = data.get('type', None)
if not data_type:
continue
# 处理错误
if 'error' == data_type:
print("\n" + "=" * 20 + "错误" + "=" * 20 + "\n")
print(f'code: {payload.get("code", "")}, message: {payload.get("message", "")}')
answer_content = "系统繁忙,请稍后重试!"
break
# 处理空内容情况
payload = data.get('payload', None)
if not payload:
continue
is_from_self = payload.get('is_from_self', False) # 消息是否由自己发出
if is_from_self:
# 消息上行敏感检测
if payload.get('is_evil', False):
print("\n" + "=" * 20 + "敏感信息" + "=" * 20 + "\n")
print(data)
answer_content = "存在敏感信息!"
break
else:
continue
else:
# 处理token_stat
if 'token_stat' == data_type:
token_count = payload.get('token_count', token_count)
continue
# 处理参考来源
if 'reference' == data_type:
references = payload.get('references', references)
# 处理开始思考过程
if 'thought' == data_type and not is_thought:
print("\n" + "=" * 20 + "开始思考" + "=" * 20 + "\n")
is_thought = True
# 处理思考
if 'thought' == data_type:
procedures = payload.get('procedures', [])
if len(procedures) > 0:
debugging = procedures[0].get('debugging', {})
content = debugging.get('content', '')
print(content[len(thought):], end='', flush=True)
thought = content
# 处理开始回答的情况
if 'reply' == data_type and not is_answering:
print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
is_answering = True
if 'reply' == data_type:
content = payload.get('content', '')
print(content[len(answer_content):], end='', flush=True)
answer_content = content
conn.close()
print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
print(token_count)
for i in range(len(references)):
url = references[i].get('url', '')
answer_content = answer_content.replace(f'[{i + 1}]', f'')
print("\n" + "=" * 20 + "补充参考来源" + "=" * 20 + "\n")
print(answer_content)
return answer_content
else:
print(f'\n请求失败,状态码: {response.status},内容: {response.read().decode("utf-8")}')
conn.close()
return "系统繁忙,请稍后重试!"
def readRoleByFilePath(self, path):
text = '你是个乐于助人的助手'
try:
with open(path, 'r', encoding='utf-8') as file:
# 读取所有文本
text = file.read()
except Exception as e:
print(f"\n读取角色发生错误:{e}")
return text
if __name__ == "__main__":
try:
ai = TencentAi('https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse', '应用密钥')
ai.chatAi('你好,能介绍一下自己吗?', '会话1', ai.readRoleByFilePath('角色.txt'))
except Exception as e:
print(f"\n发生错误:{e}")