import asyncio
import time
from pathlib import Path
from typing import TYPE_CHECKING
from app.core.stats_store import StatsStore
if TYPE_CHECKING:
from app.core.dispatcher import Dispatcher
class StatsExporter:
def __init__(self, dispatcher: "Dispatcher", output_dir: str = "stats", interval: int = 10):
self.dispatcher = dispatcher
self.output_dir = Path(output_dir)
self.interval = interval
self._running = False
self._task = None
self.output_dir.mkdir(parents=True, exist_ok=True)
self.store = StatsStore(self.output_dir / "stats.sqlite3")
async def start(self):
self._running = True
self._task = asyncio.create_task(self._run_loop())
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run_loop(self):
while self._running:
try:
await self._export_stats()
except Exception as e:
print(f"Error exporting stats: {e}")
await asyncio.sleep(self.interval)
async def export_once(self):
await self._export_stats()
def clear_daily_usage_day(self, day: str):
self.store.delete_day(day)
async def render_dashboard(self) -> str:
stats, usage_summary, daily_usage, config_stats = await self._collect_dashboard_data()
return self._generate_html(config_stats, stats, usage_summary, daily_usage)
def _get_endpoint_status_for_config(self, config_id: str, keys: list) -> list:
"""Get endpoint status from key_manager for a config."""
return self.dispatcher.key_manager.get_endpoint_info(config_id)
async def _export_stats(self):
await self._collect_dashboard_data()
async def _collect_dashboard_data(self) -> tuple[list, dict, list, dict]:
stats = await self.dispatcher.get_all_key_stats()
usage_summary = await self.dispatcher.get_usage_summary()
live_daily_usage = await self.dispatcher.get_daily_usage_history(days=7)
self.store.save_export(stats, usage_summary, live_daily_usage)
daily_usage = self.store.get_daily_usage(days=7) or live_daily_usage
config_stats = self._group_by_config(stats)
return stats, usage_summary, daily_usage, config_stats
def _group_by_config(self, stats: list) -> dict:
grouped = {}
for s in stats:
config_id = s.get('config_id', 'default')
if config_id not in grouped:
grouped[config_id] = {
'config_id': config_id,
'model_name': s['model_name'],
'provider': s['provider'],
'enabled': s['enabled'],
'keys': [],
'endpoints': [],
'total_concurrency': 0,
'total_rpm': 0,
'total_tpm': 0,
'total_rpd': 0,
'active_count': 0,
'cooldown_count': 0,
'disabled_count': 0,
}
grouped[config_id]['keys'].append(s)
grouped[config_id]['total_concurrency'] += s['usage']['current_concurrency']
grouped[config_id]['total_rpm'] += s['usage']['current_rpm']
grouped[config_id]['total_tpm'] += s['usage']['current_tpm']
grouped[config_id]['total_rpd'] += s['usage']['current_rpd']
if s['status'] == 'active':
grouped[config_id]['active_count'] += 1
elif s['status'] == 'cooldown':
grouped[config_id]['cooldown_count'] += 1
else:
grouped[config_id]['disabled_count'] += 1
# Collect endpoint info from key data
endpoint_idx = s.get('endpoint_idx')
if endpoint_idx is not None:
endpoint = {
'idx': endpoint_idx,
'key_id': s['id'],
'status': s['status'],
'enabled': s['enabled'],
'api_base': s.get('api_base', ''),
'usage': s['usage'],
'limits': s['limits'],
}
# Avoid duplicates
if not any(e['idx'] == endpoint_idx for e in grouped[config_id]['endpoints']):
grouped[config_id]['endpoints'].append(endpoint)
# Sort endpoints by idx
for cfg in grouped.values():
cfg['endpoints'].sort(key=lambda x: x['idx'])
return grouped
def _generate_html(self, config_stats: dict, all_stats: list, usage_summary: dict, daily_usage: list) -> str:
now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
daily_usage_section = self._generate_daily_usage_section(daily_usage)
config_rows = ""
for config_id, cfg in config_stats.items():
status_color = "#10b981" if cfg['enabled'] else "#64748b"
toggle_text = "禁用" if cfg['enabled'] else "启用"
toggle_class = "btn-disable" if cfg['enabled'] else "btn-enable"
active_badge = f"{cfg['active_count']}" if cfg['active_count'] > 0 else ""
cooldown_badge = f"{cfg['cooldown_count']}" if cfg['cooldown_count'] > 0 else ""
disabled_badge = f"{cfg['disabled_count']}" if cfg['disabled_count'] > 0 else ""
config_rows += f"""
|
{config_id[:25]}...
|
{cfg['model_name'][:30]} |
{cfg['provider']} |
{'ON' if cfg['enabled'] else 'OFF'}
|
{cfg['total_concurrency']} |
{cfg['total_rpm']} |
{self._format_number(cfg['total_tpm'])} |
{cfg['total_rpd']} |
{len(cfg['keys'])} {active_badge}{cooldown_badge}{disabled_badge} |
|
{self._generate_endpoints_section(cfg)}
| Key ID |
Owner |
Status |
Concurrency |
RPM |
TPM |
RPD |
429 |
{self._generate_key_rows(cfg['keys'])}
|
"""
return f"""
KeyPool Dashboard
{daily_usage_section}
{self._generate_usage_summary_cards(usage_summary)}
| Config ID |
Model |
Provider |
Status |
Conc |
RPM |
TPM |
RPD |
Keys |
Actions |
{config_rows}
"""
def _generate_daily_usage_section(self, daily_usage: list) -> str:
if not daily_usage:
return """
"""
totals = [item.get("total_rpd", 0) for item in daily_usage]
total_7d = sum(totals)
avg_7d = int(total_7d / len(totals)) if totals else 0
peak_7d = max(totals) if totals else 0
columns = "".join(self._generate_daily_usage_bar(item, peak_7d) for item in daily_usage)
return f"""
"""
def _generate_daily_usage_bar(self, item: dict, peak_value: int) -> str:
value = item.get("total_rpd", 0)
peak = max(peak_value, 1)
height_pct = max(8, int(value / peak * 100)) if value > 0 else 8
bar_class = "daily-bar" if value > 0 else "daily-bar is-zero"
title = f"{item.get('day', item.get('label', ''))}: {value}"
return f"""
{self._format_number(value)}
{item.get('label', '-')}
"""
def _generate_usage_summary_cards(self, usage_summary: dict) -> str:
cards = [
("total", "K", usage_summary["total_keys"], "Total Keys"),
("active", "A", usage_summary["active_keys"], "Active"),
("cooldown", "C", usage_summary["cooldown_keys"], "Cooldown"),
("disabled", "D", usage_summary["disabled_keys"], "Disabled"),
("concurrency", "Q", usage_summary["total_concurrency"], "Concurrency"),
("rpm", "R", usage_summary["total_rpm"], "RPM"),
("tpm", "T", self._format_number(usage_summary["total_tpm"]), "TPM"),
("rpd", "D", usage_summary["total_rpd"], "RPD"),
("rpd", "Y", usage_summary.get("total_yesterday_rpd", 0), "Yesterday RPD"),
]
card_html = "".join(
f"""
"""
for card_class, icon, value, label in cards
)
return f"""
{card_html}
"""
def _generate_endpoints_section(self, cfg: dict) -> str:
"""Generate endpoint management section for configs with multiple endpoints."""
endpoints = cfg.get('endpoints', [])
if not endpoints:
return ""
config_id = cfg['config_id']
rows = ""
for ep in endpoints:
status_color = "#4ade80" if ep['status'] == 'active' else "#6b7280"
toggle_text = "禁用" if ep['enabled'] else "启用"
toggle_class = "btn-disable" if ep['enabled'] else "btn-enable"
# Get IP from URL for display
api_base = ep.get('api_base', '')
url_display = api_base.replace('http://', '').replace('https://', '')[:30]
conc_pct = (ep['usage']['current_concurrency'] / ep['limits']['max_concurrency'] * 100) if ep['limits']['max_concurrency'] > 0 else 0
rpm_pct = (ep['usage']['current_rpm'] / ep['limits']['rpm'] * 100) if ep['limits']['rpm'] > 0 else 0
rows += f"""
| EP-{ep['idx']} |
{url_display} |
{ep['status']} |
{self._progress_bar(conc_pct, ep['usage']['current_concurrency'], ep['limits']['max_concurrency'])} |
{self._progress_bar(rpm_pct, ep['usage']['current_rpm'], ep['limits']['rpm'])} |
|
"""
return f"""
本地节点 (Endpoints)
| Node |
URL |
Status |
Concurrency |
RPM |
Action |
{rows}
"""
def _generate_key_rows(self, keys: list) -> str:
rows = ""
for k in keys:
status_color = self._get_status_color(k["status"])
conc_pct = (k["usage"]["current_concurrency"] / k["limits"]["max_concurrency"] * 100) if k["limits"]["max_concurrency"] > 0 else 0
rpm_pct = (k["usage"]["current_rpm"] / k["limits"]["rpm"] * 100) if k["limits"]["rpm"] > 0 else 0
tpm_pct = (k["usage"]["current_tpm"] / k["limits"]["tpm"] * 100) if k["limits"]["tpm"] > 0 else 0
rpd_pct = (k["usage"]["current_rpd"] / k["limits"]["rpd"] * 100) if k["limits"]["rpd"] > 0 else 0
cooldown_str = "429" if k["status"] == "cooldown" else "-"
rows += f"""
| {k['id'][:20]}... |
{k.get('owner', '-')} |
{k['status']} |
{self._progress_bar(conc_pct, k['usage']['current_concurrency'], k['limits']['max_concurrency'])} |
{self._progress_bar(rpm_pct, k['usage']['current_rpm'], k['limits']['rpm'])} |
{self._progress_bar(tpm_pct, k['usage']['current_tpm'], k['limits']['tpm'])} |
{self._progress_bar(rpd_pct, k['usage']['current_rpd'], k['limits']['rpd'])} |
{cooldown_str} |
"""
return rows
def _get_status_color(self, status: str) -> str:
colors = {
"active": "#4ade80",
"cooldown": "#f97316",
"disabled": "#6b7280",
"error": "#ef4444",
}
return colors.get(status.lower(), "#6b7280")
def _progress_bar(self, percentage: float, current: int, limit: int) -> str:
if limit == 0:
return f"N/A"
percentage = min(100, max(0, percentage))
if percentage < 50:
color = "#4ade80"
elif percentage < 80:
color = "#fbbf24"
else:
color = "#ef4444"
return f"""
"""
def _format_number(self, num: int) -> str:
if num >= 1000000:
return f"{num/1000000:.1f}M"
elif num >= 1000:
return f"{num/1000:.1f}K"
return str(num)