Files
infstarweb/statsprocess.py

262 lines
7.4 KiB
Python

import os
import json
import threading
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from tqdm import tqdm # Add tqdm for progress bars
STATS_DIR = "stats"
MAX_WORKERS = max(4, min(16, int(os.environ.get("STATS_MAX_WORKERS", (os.cpu_count() or 4) * 2))))
# HTTP Basic Auth for BASE_URL (from environment variables)
BASE_URL = os.environ.get("STATS_BASE_URL", "")
STATS_USER = os.environ.get("STATS_USER", "")
STATS_PASS = os.environ.get("STATS_PASS", "")
BASE_AUTH = (STATS_USER, STATS_PASS) if STATS_USER else None
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
thread_local = threading.local()
def create_session():
session = requests.Session()
session.trust_env = False # Ignore HTTP_PROXY / HTTPS_PROXY env vars
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=MAX_WORKERS,
pool_maxsize=MAX_WORKERS,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def get_session():
session = getattr(thread_local, "session", None)
if session is None:
session = create_session()
thread_local.session = session
return session
if BASE_AUTH:
print(f"Using authentication for BASE_URL (user: {STATS_USER})")
else:
print("No STATS_USER/STATS_PASS set, accessing BASE_URL without auth.")
# Ensure directories exist
os.makedirs(STATS_DIR, exist_ok=True)
print("Fetching file list...")
fetch_failed = False
try:
response = get_session().get(BASE_URL, timeout=10, auth=BASE_AUTH)
response.raise_for_status()
content = response.text
# Regex for UUID.json
files = sorted(set(re.findall(r'href="([0-9a-f-]{36}\.json)"', content)))
print(f"Found {len(files)} player stats files.")
except Exception as e:
print(f"Error fetching file list: {e}")
files = []
fetch_failed = True
def load_name_cache():
summary_path = os.path.join(STATS_DIR, 'summary.json')
if not os.path.exists(summary_path):
return {}
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary = json.load(f)
except Exception:
return {}
return {
player.get('uuid'): player.get('name')
for player in summary.get('players', [])
if player.get('uuid') and player.get('name') and player.get('name') != "Unknown"
}
def get_player_name(uuid):
# Try Ashcon first
try:
r = get_session().get(f"https://api.ashcon.app/mojang/v2/user/{uuid}", timeout=5)
if r.status_code == 200:
return r.json().get('username')
except Exception:
pass
# Try Mojang Session
try:
r = get_session().get(f"https://sessionserver.mojang.com/session/minecraft/profile/{uuid}", timeout=5)
if r.status_code == 200:
return r.json().get('name')
except Exception:
pass
return "Unknown"
def format_dist(cm):
m = cm / 100
if m < 1000:
return f"{m:.1f} m"
return f"{m / 1000:.2f} km"
def format_time(ticks):
seconds = ticks / 20
if seconds < 60:
return f"{seconds:.3f}"
minutes = seconds / 60
if minutes < 60:
return f"{minutes:.3f} 分钟"
hours = minutes / 60
if hours < 24:
return f"{hours:.3f} 小时"
days = hours / 24
return f"{days:.3f}"
def process_player(filename, name_cache):
uuid = filename.replace(".json", "")
json_path = os.path.join(STATS_DIR, filename)
# 1. Download/Load JSON
data = None
try:
r = get_session().get(BASE_URL + filename, timeout=10, auth=BASE_AUTH)
if r.status_code == 200:
data = r.json()
else:
print(f"Failed to download {filename}")
return None
except Exception as e:
print(f"Error downloading {filename}: {e}")
return None
if not data:
return None
# 2. Get Name
player_name = name_cache.get(uuid, "Unknown")
if player_name == "Unknown":
player_name = get_player_name(uuid)
# 3. Download Avatar - SKIPPED to avoid rate limits
# The frontend will handle dynamic loading of avatars using Minotar/Crafatar URLs.
# 4. Calculate Stats
stats = data.get('stats', {})
# Walk
# Handle both modern ':' and potentially flattened or different versions if necessary,
# but usually proper JSON has "minecraft:custom"
# "minecraft:walk_one_cm"
custom = stats.get('minecraft:custom', {})
walk_cm = custom.get('minecraft:walk_one_cm', 0)
walk_fmt = format_dist(walk_cm)
# Play Time (1 tick = 1/20 second)
play_time_ticks = custom.get('minecraft:play_time', 0)
play_time_fmt = format_time(play_time_ticks)
# Mined
mined = stats.get('minecraft:mined', {})
total_mined = sum(mined.values())
# Placed (Used)
used = stats.get('minecraft:used', {})
total_placed = sum(used.values())
# Deaths (Killed By)
killed_by = stats.get('minecraft:killed_by', {})
total_deaths = sum(killed_by.values())
# Kills (Killed)
killed = stats.get('minecraft:killed', {})
total_kills = sum(killed.values())
# Inject into JSON
data['extra'] = {
'player_name': player_name,
'formatted_walk': walk_fmt,
'walk_cm': walk_cm,
'total_mined': total_mined,
'total_placed': total_placed,
'total_deaths': total_deaths,
'total_kills': total_kills,
'play_time_fmt': play_time_fmt,
'play_time_ticks': play_time_ticks
}
# Save
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return {
'uuid': uuid,
'name': player_name,
'avatar': f"https://minotar.net/avatar/{player_name}/64" if player_name != "Unknown" else f"https://minotar.net/avatar/{uuid}/64",
'stats': {
'walk_fmt': walk_fmt,
'walk_raw': walk_cm,
'mined': total_mined,
'placed': total_placed,
'deaths': total_deaths,
'kills': total_kills,
'play_time_fmt': play_time_fmt,
'play_time_raw': play_time_ticks
}
}
name_cache = load_name_cache()
results = []
if files:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_map = {
executor.submit(process_player, filename, name_cache): filename
for filename in files
}
for future in tqdm(as_completed(future_map), total=len(future_map), desc="Processing players"):
try:
result = future.result()
except Exception as e:
print(f"Worker failed for {future_map[future]}: {e}")
continue
if result is not None:
results.append(result)
if fetch_failed:
print("Skipping summary update because file list fetch failed.")
raise SystemExit(1)
# Sort by name perhaps? Or just raw list.
results.sort(key=lambda x: x['name'])
summary = {
'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'players': results
}
with open(os.path.join(STATS_DIR, 'summary.json'), 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=4)
print("Processing complete. Summary saved to stats/summary.json")