通过python长期开启CDP

定时自动重启浏览器,当浏览器关闭时自动开启

import logging
from logging.handlers import TimedRotatingFileHandler
import subprocess
import psutil
import shutil
import schedule
import time
import os
import sys

# 配置日志基本设置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 日志文件路径
log_file_path = 'log/output.log'
# 确保日志目录存在
log_directory = os.path.dirname(log_file_path)
if not os.path.exists(log_directory):
    os.makedirs(log_directory)
# 创建TimedRotatingFileHandler,每天轮换日志文件
handler = TimedRotatingFileHandler(log_file_path, encoding='utf-8', when="midnight", interval=1, backupCount=30)
# 使用日志格式器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 为日志记录系统添加handler
logging.getLogger().addHandler(handler)


# 定义 Chrome 路径(常见安装位置,如果不对请手动修改)
# "C:\Program Files\Google\Chrome\Application\chrome.exe"
chromePath = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
# 定义调试端口
# 由于chrome默认只能127.0.0.1地址访问因此需要使用 netsh 端口转发 netsh interface portproxy add v4tov4 listenport=9222 listenaddress=你的局域网IP connectport=9222 connectaddress=10.126.126.2
debugPort = 9222
# 定义用户数据目录,用于保存你的登录信息、插件等,实现持久化
userDataDir = "D:\openclaw\Launch-ChromeDebug\chrome-debug-profile"

process_name = "chrome.exe"
process_search = f"--remote-debugging-port={debugPort}"
program_start_command = [chromePath, f"--remote-debugging-port={debugPort}", "--remote-debugging-address=0.0.0.0", f"--user-data-dir={userDataDir}", '--remote-allow-origins=*']
program_directory = "."

# 查询程序的pid
def get_process_id(process_name):
    for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
        if process_name == proc.info['name']:
            # logging.info(f"cmdline:{proc.info['cmdline']}")
            for cmd_arg in proc.info['cmdline']:
                if process_search in cmd_arg:
                    # logging.info(f"程序:{process_name},pid:{proc.info['pid']}")
                    return proc.info['pid']
    return None

# 检测程序是否在执行
def check_process_running(process_name):
    # for process in psutil.process_iter(['name', 'cmdline']):
    #     if process.info['name'] and process.info['name'] == process_name:
    #         return True
    #     elif process.info['cmdline'] and process_name in process.info['cmdline']:
    #         return True
    # return False
    pid = get_process_id(process_name)
    if pid:
        return True
    else:
        return False

# 关闭服务器
def stop_program():
    logging.info("服务器关闭")
    if sys.platform.startswith('win'):
        # subprocess.Popen('taskkill /F /IM ' + process_name, shell=True)
        pid = get_process_id(process_name)
        logging.info(f"程序:{process_name},pid:{pid}")
        if pid:
            subprocess.Popen(f"taskkill /F /PID {pid}", shell=True)
    # elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
    #     subprocess.Popen(['pkill', '-f', 'your_program.py'])

# 启动服务器
def start_program():
    logging.info("服务器启动")
    if sys.platform.startswith('win'):
        # subprocess.Popen(program_path, shell=True)
        # subprocess.run([program_start_command], input=b'\n', shell=True, cwd=program_directory)
        process = subprocess.Popen(program_start_command,
                                    shell=True,
                                    cwd=program_directory,
                                    stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.STDOUT,
                                    universal_newlines=True,
                                    encoding='gbk')
        # # 实时获取控制台输出内容
        # for line in process.stdout:
        #     out = process.stdout.readline()
        #     logging.info(out, end='')
        #     if out == '' and process.poll() is not None:
        #         break
        #     elif '请按任意键继续. . .' in out:
        #         # 当输出中出现特定内容时
        #         process.stdin.write('\n')
        #         process.stdin.flush()  # 确保输入被发送
        #         # 

        time.sleep(10)  # 等待10秒
        # 强制终止子进程
        process.kill()
        # 等待进程结束
        process.wait()
        # 检查是否有错误发生
        if process.returncode != 0:
            logging.info(f"Command execution failed with exit code {process.returncode}")
    # elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
    #     subprocess.Popen(['python', 'your_program.py'])

# 重启任务
restarting = False
def stop_and_restart_job():
    # 在更改全局变量前先声明
    global restarting  # 声明restarting为全局变量
    restarting = True
    while check_process_running(process_name): # 等待程序结束完成
        stop_program()
        time.sleep(10)  # 等待10秒,确保程序完全关闭
    logging.info(f"启动{process_name}程序")
    while not check_process_running(process_name): # 查询程序是否启动,如果未启动则再次启动
        start_program()
        time.sleep(60)
    restarting = False

# 当程序崩溃时自动启动
def auto_start_job():
    # 在更改全局变量前先声明
    global start_time  # 声明start_time为全局变量
    if not restarting and not check_process_running(process_name):
        logging.info("检测到服务器未启动,开始自启动")
        start_program()

# 定义任务调度函数
def schedule_job():
    schedule.every(1).minutes.do(auto_start_job)   # 每分钟检测一次程序是否崩溃
    schedule.every().day.at("23:53").do(stop_and_restart_job)   # 每天23:53重启服务器

# 启动任务调度器
schedule_job()

logging.info(f"检测{process_name}程序是否在执行:{check_process_running(process_name)}")
# if not check_process_running(process_name):
#     logging.info("程序未执行,启动程序更新")
#     run_steamcmd_update_command_realtime()
#     start_program()
while True:
    schedule.run_pending()  # 检查是否有任务需要执行
    time.sleep(1) # 避免过于频繁的检查
最后修改:2026 年 03 月 08 日
如果觉得我的文章对你有用,请随意赞赏