通过python长期开启CDP
定时自动重启浏览器,当浏览器关闭时自动开启
import logging
from logging.handlers import TimedRotatingFileHandler
import subprocess
import psutil
import shutil
import schedule
import time
import os
import sys
# 配置日志基本设置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 日志文件路径
log_file_path = 'log/output.log'
# 确保日志目录存在
log_directory = os.path.dirname(log_file_path)
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# 创建TimedRotatingFileHandler,每天轮换日志文件
handler = TimedRotatingFileHandler(log_file_path, encoding='utf-8', when="midnight", interval=1, backupCount=30)
# 使用日志格式器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 为日志记录系统添加handler
logging.getLogger().addHandler(handler)
# 定义 Chrome 路径(常见安装位置,如果不对请手动修改)
# "C:\Program Files\Google\Chrome\Application\chrome.exe"
chromePath = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
# 定义调试端口
# 由于chrome默认只能127.0.0.1地址访问因此需要使用 netsh 端口转发 netsh interface portproxy add v4tov4 listenport=9222 listenaddress=你的局域网IP connectport=9222 connectaddress=10.126.126.2
debugPort = 9222
# 定义用户数据目录,用于保存你的登录信息、插件等,实现持久化
userDataDir = "D:\openclaw\Launch-ChromeDebug\chrome-debug-profile"
process_name = "chrome.exe"
process_search = f"--remote-debugging-port={debugPort}"
program_start_command = [chromePath, f"--remote-debugging-port={debugPort}", "--remote-debugging-address=0.0.0.0", f"--user-data-dir={userDataDir}", '--remote-allow-origins=*']
program_directory = "."
# 查询程序的pid
def get_process_id(process_name):
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if process_name == proc.info['name']:
# logging.info(f"cmdline:{proc.info['cmdline']}")
for cmd_arg in proc.info['cmdline']:
if process_search in cmd_arg:
# logging.info(f"程序:{process_name},pid:{proc.info['pid']}")
return proc.info['pid']
return None
# 检测程序是否在执行
def check_process_running(process_name):
# for process in psutil.process_iter(['name', 'cmdline']):
# if process.info['name'] and process.info['name'] == process_name:
# return True
# elif process.info['cmdline'] and process_name in process.info['cmdline']:
# return True
# return False
pid = get_process_id(process_name)
if pid:
return True
else:
return False
# 关闭服务器
def stop_program():
logging.info("服务器关闭")
if sys.platform.startswith('win'):
# subprocess.Popen('taskkill /F /IM ' + process_name, shell=True)
pid = get_process_id(process_name)
logging.info(f"程序:{process_name},pid:{pid}")
if pid:
subprocess.Popen(f"taskkill /F /PID {pid}", shell=True)
# elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
# subprocess.Popen(['pkill', '-f', 'your_program.py'])
# 启动服务器
def start_program():
logging.info("服务器启动")
if sys.platform.startswith('win'):
# subprocess.Popen(program_path, shell=True)
# subprocess.run([program_start_command], input=b'\n', shell=True, cwd=program_directory)
process = subprocess.Popen(program_start_command,
shell=True,
cwd=program_directory,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding='gbk')
# # 实时获取控制台输出内容
# for line in process.stdout:
# out = process.stdout.readline()
# logging.info(out, end='')
# if out == '' and process.poll() is not None:
# break
# elif '请按任意键继续. . .' in out:
# # 当输出中出现特定内容时
# process.stdin.write('\n')
# process.stdin.flush() # 确保输入被发送
# #
time.sleep(10) # 等待10秒
# 强制终止子进程
process.kill()
# 等待进程结束
process.wait()
# 检查是否有错误发生
if process.returncode != 0:
logging.info(f"Command execution failed with exit code {process.returncode}")
# elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
# subprocess.Popen(['python', 'your_program.py'])
# 重启任务
restarting = False
def stop_and_restart_job():
# 在更改全局变量前先声明
global restarting # 声明restarting为全局变量
restarting = True
while check_process_running(process_name): # 等待程序结束完成
stop_program()
time.sleep(10) # 等待10秒,确保程序完全关闭
logging.info(f"启动{process_name}程序")
while not check_process_running(process_name): # 查询程序是否启动,如果未启动则再次启动
start_program()
time.sleep(60)
restarting = False
# 当程序崩溃时自动启动
def auto_start_job():
# 在更改全局变量前先声明
global start_time # 声明start_time为全局变量
if not restarting and not check_process_running(process_name):
logging.info("检测到服务器未启动,开始自启动")
start_program()
# 定义任务调度函数
def schedule_job():
schedule.every(1).minutes.do(auto_start_job) # 每分钟检测一次程序是否崩溃
schedule.every().day.at("23:53").do(stop_and_restart_job) # 每天23:53重启服务器
# 启动任务调度器
schedule_job()
logging.info(f"检测{process_name}程序是否在执行:{check_process_running(process_name)}")
# if not check_process_running(process_name):
# logging.info("程序未执行,启动程序更新")
# run_steamcmd_update_command_realtime()
# start_program()
while True:
schedule.run_pending() # 检查是否有任务需要执行
time.sleep(1) # 避免过于频繁的检查