参考文档

https://api-docs.deepseek.com/zh-cn/api/create-chat-completion
https://cloud.tencent.com/document/product/1772/115969
https://blog.csdn.net/qq_26144365/article/details/145469936

腾讯云申请密钥入口

实现代码

import http.client
import json
from urllib.parse import urlencode, urlparse

class OpenAi:
  def __init__(self, base_url, api_key='sk-xxxxxxxxxxx', model='deepseek-r1', max_tokens=1024, max_messages=2):
    self.parsed_url = urlparse(base_url)
    self.api_key = api_key
    self.model = model
    self.max_tokens = max_tokens

    self.messagesMap = {}
    self.max_messages = max_messages # 最大记录对话轮数

  # 根据当前会话id记录聊天信息,当记录的聊天信息轮数超过最大值时自动清理最旧的数据
  def setMessages(self, key, messages):
    self.messagesMap.setdefault(key, []).extend(messages);
    # print(f'\n当前{key}队列长度:{len(self.messagesMap[key])}')
    while len(self.messagesMap[key]) > (self.max_messages * 2):
      self.messagesMap[key].pop(0)
    # print(f'\n处理后{key}队列长度:{len(self.messagesMap[key])}')

  # 根据当前会话id获取历史聊天信息
  def getMessages(self, key):
    return self.messagesMap.setdefault(key, []);

  # 对话,question:问题,key:当前会话id用于记录对话,role:自定义人设
  def chatAi(self, question, key='default', role='You are a helpful assistant'):
    # 创建连接
    conn = None
    if self.parsed_url.scheme == 'https':
      conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port)
    else:
      conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port)
    
    headers = {
      'Content-Type': 'application/json; charset=utf-8',
      # 'Accept': 'application/json',
      'Authorization': 'Bearer ' + self.api_key
    }
    messages = [{'role': 'system', 'content': role}]
    messages.extend(self.getMessages(key))
    messages.append({'role': 'user', 'content': question})
    data = {
      'model': self.model,
      # 'model': 'deepseek-r1:32b',
      # 'model': 'deepseek-r1',
      # 'model': 'deepseek-v3',
      'messages': messages,
      'max_tokens': self.max_tokens,
      # 'max_tokens': 4096,
      # 'max_tokens': 1024,
      'temperature': 1.5, # 代码生成/数学解题: 0.0, 数据抽取/分析: 1.0, 通用对话: 1.3, 翻译: 1.3, 创意类写作/诗歌创作: 1.5
      'stream': True
    }
    body = json.dumps(data).encode('utf-8')  # 将字典转换为JSON字符串,并编码为UTF-8字节串
    
    conn.request('POST', self.parsed_url.path, body=body, headers=headers)
    response = conn.getresponse()

    answer_reasoning_content = ""  # 定义完整思考过程
    answer_content = ""     # 定义完整回复
    is_answering = False   # 判断是否结束思考过程并开始回复

    if response.status == 200:
      while True:
        chunk = response.readline();
        if not chunk:
          break # 没有更多数据了
        data = chunk.decode("utf-8")
        # print(data);
        if data.startswith("data: {"):
          data = json.loads(data[6:])

          # 处理usage信息
          choices = data.get('choices', None)
          if not choices:
            print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
            print(data['usage'])
            continue

          delta = choices[0]['delta']

          # 处理空内容情况
          reasoning_content = delta.get('reasoning_content', None) # 思考过程
          content = delta.get('content', None) # 回复内容
          if not reasoning_content and not content:
            continue

          # 处理开始回答的情况
          if not reasoning_content and not is_answering:
            print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
            is_answering = True

          # 处理思考过程
          if reasoning_content:
            print(reasoning_content, end='', flush=True)
            answer_reasoning_content += reasoning_content

          # 处理回复内容
          elif content:
            print(content, end='', flush=True)
            answer_content += content
      conn.close()
      self.setMessages(key, [
        {'role': 'user', 'content': question},
        {'role': 'assistant', 'content': answer_content}
      ])
      return answer_content
    else:
      print(f'\n请求失败,状态码: {response.status},内容: {response.read().decode("utf-8")}')
      conn.close()
      return "系统繁忙,请稍后重试!"

  # 通过文件路径读取角色
  def readRoleByFilePath(self, path):
    text = '你是个乐于助人的助手'
    try:
      with open(path, 'r', encoding='utf-8') as file:
        # 读取所有文本
        text = file.read()
    except Exception as e:
      print(f"\n读取角色发生错误:{e}")
    return text

if __name__ == "__main__":
  try:
    ai = OpenAi('https://api.lkeap.cloud.tencent.com/v1/chat/completions', 'sk-xxxxxxxxxxx', max_messages=3)
    ai.chatAi('你好,能介绍一下自己吗?', '会话1', ai.readRoleByFilePath('角色.txt'))
  except Exception as e:
      print(f"\n发生错误:{e}")

调用腾讯云联网deepseek-r1

https://lke.cloud.tencent.com/lke#/app/home
https://cloud.tencent.com/document/product/1759/105561#5d0f7191-c1e6-4968-84c9-7825fb3a8fe1

如何获取 AppKey

在应用管理界面,找到您处于运行中的应用(需要先发布),单击调用,会弹出“调用信息”窗口。
2025-02-21T08:56:57.png
在调用信息窗口可以看到 AppKey,单击复制即可。
2025-02-21T08:57:51.png

curl 调用示例

curl -XPOST -vvv --no-buffer --location 'https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse' \
--header 'Content-Type: application/json' \
--data '{
    "content": "消息内容",
    "bot_app_key": "<your appkey>",
    "visitor_biz_id": "<your visitor id>",
    "session_id": "<your session_id>",
    "visitor_labels": []
}'

postman 调用示例

2025-02-21T08:59:10.png

实现代码

import http.client
import json
from urllib.parse import urlencode, urlparse

class TencentAi:
  def __init__(self, base_url='https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse', bot_app_key='<your appkey>', visitor_biz_id='<your visitor id>', session_id='<your session_id>', visitor_labels=[]):
    self.parsed_url = urlparse(base_url)
    self.bot_app_key = bot_app_key # 应用密钥
    self.visitor_biz_id = visitor_biz_id # 访客 ID(外部输入,建议唯一,标识当前接入会话的用户)
    self.session_id = session_id # 会话 ID,用于标识一个会话(外部系统提供,建议不同的用户端会话传入不同的 session_id,否则同一个应用下的不同用户的消息记录会串掉)参数长度:2-64个字符
    self.visitor_labels = visitor_labels # 知识标签(用于知识库中知识的检索过滤);知识标签结构为: [{"name": "subject", "values": ["语文", "数学"]}]

  def chatAi(self, question, key='default', role='你是个乐于助人的助手'):
    # 创建连接
    conn = None
    if self.parsed_url.scheme == 'https':
      conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port)
    else:
      conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port)
    
    headers = {
      'Content-Type': 'application/json; charset=utf-8',
      # 'Accept': 'application/json',
    }
    if len(key) < 2:
      print(f'\n参数过短:{key}\n')
    data = {
      'content': question, # 消息内容,如果发送图片,在此传递 markdown 格式的图片链接,例如![](图片连接),其中图片链接需要可公有读。
      # 'request_id': key, # 请求 ID,用于标识一个请求(作消息串联,建议每个请求使用不同的 request_id)
      'bot_app_key': self.bot_app_key,
      'visitor_biz_id': key,
      'session_id': key,
      # 'visitor_labels': self.visitor_labels,
      'system_role': role
    }
    body = json.dumps(data).encode('utf-8')  # 将字典转换为JSON字符串,并编码为UTF-8字节串
    
    conn.request('POST', self.parsed_url.path, body=body, headers=headers)
    response = conn.getresponse()

    token_count = 0 # 本轮请求消耗 token 数(当包含多个过程时, 计算将汇总)
    references = [] # 参考来源
    is_thought = False # 判断是否开始思考过程
    thought = ""  # 定义完整思考过程
    answer_content = ""     # 定义完整回复
    is_answering = False   # 判断是否结束思考过程并开始回复

    if response.status == 200:
      # 设置一个块大小
      while True:
        chunk = response.readline();
        if not chunk:
          break # 没有更多数据了
        data = chunk.decode("utf-8")
        # print(data);
        if data.startswith("data:{"):
          data = json.loads(data[5:])

          # 处理空类型情况
          data_type = data.get('type', None)
          if not data_type:
            continue

          # 处理错误
          if 'error' == data_type:
            print("\n" + "=" * 20 + "错误" + "=" * 20 + "\n")
            print(f'code: {payload.get("code", "")}, message: {payload.get("message", "")}')
            answer_content = "系统繁忙,请稍后重试!"
            break

          # 处理空内容情况
          payload = data.get('payload', None)
          if not payload:
            continue

          is_from_self = payload.get('is_from_self', False) # 消息是否由自己发出
          if is_from_self:
            # 消息上行敏感检测
            if payload.get('is_evil', False):
              print("\n" + "=" * 20 + "敏感信息" + "=" * 20 + "\n")
              print(data)
              answer_content = "存在敏感信息!"
              break
            else:
              continue
          else:
            # 处理token_stat
            if 'token_stat' == data_type:
              token_count = payload.get('token_count', token_count)
              continue

            # 处理参考来源
            if 'reference' == data_type:
              references = payload.get('references', references)

            # 处理开始思考过程
            if 'thought' == data_type and not is_thought:
              print("\n" + "=" * 20 + "开始思考" + "=" * 20 + "\n")
              is_thought = True

            # 处理思考
            if 'thought' == data_type:
              procedures = payload.get('procedures', [])
              if len(procedures) > 0:
                debugging = procedures[0].get('debugging', {})
                content = debugging.get('content', '')
                print(content[len(thought):], end='', flush=True)
                thought = content

            # 处理开始回答的情况
            if 'reply' == data_type and not is_answering:
              print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
              is_answering = True

            if 'reply' == data_type:
              content = payload.get('content', '')
              print(content[len(answer_content):], end='', flush=True)
              answer_content = content

      conn.close()
      print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
      print(token_count)
      for i in range(len(references)):
        url = references[i].get('url', '')
        answer_content = answer_content.replace(f'[{i + 1}]', f'![{i + 1}]({url})')
      print("\n" + "=" * 20 + "补充参考来源" + "=" * 20 + "\n")
      print(answer_content)
      return answer_content
    else:
      print(f'\n请求失败,状态码: {response.status},内容: {response.read().decode("utf-8")}')
      conn.close()
      return "系统繁忙,请稍后重试!"

  def readRoleByFilePath(self, path):
    text = '你是个乐于助人的助手'
    try:
      with open(path, 'r', encoding='utf-8') as file:
        # 读取所有文本
        text = file.read()
    except Exception as e:
      print(f"\n读取角色发生错误:{e}")
    return text

if __name__ == "__main__":
  try:
    ai = TencentAi('https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse', '应用密钥')
    ai.chatAi('你好,能介绍一下自己吗?', '会话1', ai.readRoleByFilePath('角色.txt'))
  except Exception as e:
    print(f"\n发生错误:{e}")
最后修改:2025 年 02 月 22 日
如果觉得我的文章对你有用,请随意赞赏