Files
infstarweb/statsprocess.py

225 lines
6.9 KiB
Python

import os
import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from tqdm import tqdm # Add tqdm for progress bars
BASE_URL = "http://x2.sjcmc.cn:15960/stats/"
STATS_DIR = "stats"
# HTTP Basic Auth for BASE_URL (from environment variables)
STATS_USER = os.environ.get("STATS_USER", "")
STATS_PASS = os.environ.get("STATS_PASS", "")
BASE_AUTH = (STATS_USER, STATS_PASS) if STATS_USER else None
# Create a session that bypasses system proxy and retries on failure
session = requests.Session()
session.trust_env = False # Ignore HTTP_PROXY / HTTPS_PROXY env vars
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
session.mount("http://", HTTPAdapter(max_retries=retry_strategy))
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
if BASE_AUTH:
print(f"Using authentication for BASE_URL (user: {STATS_USER})")
else:
print("No STATS_USER/STATS_PASS set, accessing BASE_URL without auth.")
# Ensure directories exist
os.makedirs(STATS_DIR, exist_ok=True)
print("Fetching file list...")
try:
response = session.get(BASE_URL, timeout=10, auth=BASE_AUTH)
response.raise_for_status()
content = response.text
# Regex for UUID.json
files = re.findall(r'href="([0-9a-f-]{36}\.json)"', content)
files = list(set(files))
print(f"Found {len(files)} player stats files.")
except Exception as e:
print(f"Error fetching file list: {e}")
files = []
def get_player_name(uuid):
# Try Ashcon first
try:
r = session.get(f"https://api.ashcon.app/mojang/v2/user/{uuid}", timeout=5)
if r.status_code == 200:
return r.json().get('username')
except:
pass
# Try Mojang Session
try:
r = session.get(f"https://sessionserver.mojang.com/session/minecraft/profile/{uuid}", timeout=5)
if r.status_code == 200:
return r.json().get('name')
except:
pass
return "Unknown"
def process_player(filename):
uuid = filename.replace(".json", "")
json_path = os.path.join(STATS_DIR, filename)
# 1. Download/Load JSON
data = None
try:
# Check if we already have it locally and it's valid, maybe skip download?
# User implies fetching updates, so we download.
r = session.get(BASE_URL + filename, timeout=10, auth=BASE_AUTH)
if r.status_code == 200:
data = r.json()
else:
print(f"Failed to download {filename}")
return None
except Exception as e:
print(f"Error downloading {filename}: {e}")
return None
if not data:
return None
# 2. Get Name
# We can check if name is already in the processing file to avoid API calls if scraping repeatedly?
# For this task, we assume we need to fetch it.
# To save API calls, we could check if we have a saved version with a name.
player_name = "Unknown"
# Check if 'extra' exists in downloaded data (unlikely if strictly from server)
# But checking if we have a local cache of this file with a name is smart
if os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f:
local_data = json.load(f)
if 'extra' in local_data and local_data['extra'].get('player_name') != "Unknown":
player_name = local_data['extra']['player_name']
except:
pass
if player_name == "Unknown":
player_name = get_player_name(uuid)
# Sleep slightly to be nice to APIs if meaningful massive parallel
time.sleep(0.1)
# 3. Download Avatar - SKIPPED to avoid rate limits
# The frontend will handle dynamic loading of avatars using Minotar/Crafatar URLs.
# 4. Calculate Stats
stats = data.get('stats', {})
# Walk
# Handle both modern ':' and potentially flattened or different versions if necessary,
# but usually proper JSON has "minecraft:custom"
# "minecraft:walk_one_cm"
custom = stats.get('minecraft:custom', {})
walk_cm = custom.get('minecraft:walk_one_cm', 0)
def format_dist(cm):
m = cm / 100
if m < 1000:
return f"{m:.1f} m"
else:
return f"{m/1000:.2f} km"
walk_fmt = format_dist(walk_cm)
# Play Time (1 tick = 1/20 second)
play_time_ticks = custom.get('minecraft:play_time', 0)
def format_time(ticks):
seconds = ticks / 20
if seconds < 60:
return f"{seconds:.3f}"
minutes = seconds / 60
if minutes < 60:
return f"{minutes:.3f} 分钟"
hours = minutes / 60
if hours < 24:
return f"{hours:.3f} 小时"
days = hours / 24
return f"{days:.3f}"
play_time_fmt = format_time(play_time_ticks)
# Mined
mined = stats.get('minecraft:mined', {})
total_mined = sum(mined.values())
# Placed (Used)
used = stats.get('minecraft:used', {})
total_placed = sum(used.values())
# Deaths (Killed By)
killed_by = stats.get('minecraft:killed_by', {})
total_deaths = sum(killed_by.values())
# Kills (Killed)
killed = stats.get('minecraft:killed', {})
total_kills = sum(killed.values())
# Inject into JSON
data['extra'] = {
'player_name': player_name,
'formatted_walk': walk_fmt,
'walk_cm': walk_cm,
'total_mined': total_mined,
'total_placed': total_placed,
'total_deaths': total_deaths,
'total_kills': total_kills,
'play_time_fmt': play_time_fmt,
'play_time_ticks': play_time_ticks
}
# Save
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return {
'uuid': uuid,
'name': player_name,
'avatar': f"https://minotar.net/avatar/{player_name}/64" if player_name != "Unknown" else f"https://minotar.net/avatar/{uuid}/64",
'stats': {
'walk_fmt': walk_fmt,
'walk_raw': walk_cm,
'mined': total_mined,
'placed': total_placed,
'deaths': total_deaths,
'kills': total_kills,
'play_time_fmt': play_time_fmt,
'play_time_raw': play_time_ticks
}
}
# Process sequentially with progress bar
results = []
if files:
for filename in tqdm(files, desc="Processing players"):
result = process_player(filename)
if result is not None:
results.append(result)
# Sort by name perhaps? Or just raw list.
results.sort(key=lambda x: x['name'])
summary = {
'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'players': results
}
with open(os.path.join(STATS_DIR, 'summary.json'), 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=4)
print("Processing complete. Summary saved to stats/summary.json")