- 新增最终验证脚本和迁移测试脚本,确保DSL引擎的完整性和功能 - 实现故事模块的音频配置、角色定义和情感故事模块 - 迁移现有故事数据到新的DSL格式,支持动态内容和条件导航 - 更新主题和UI组件,确保一致的黑色背景和暗色主题 - 添加音频管理器和性能监控工具,提升游戏体验和调试能力
537 lines
20 KiB
Python
537 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import argparse
|
||
import os
|
||
import re
|
||
from typing import Dict, List, Tuple, Set
|
||
|
||
|
||
NODE_RE = re.compile(r'^\s*@node\s+(?P<id>[A-Za-z0-9_]+)\s*$')
|
||
TITLE_RE = re.compile(r'^\s*@title\s+"(?P<title>.*)"\s*$')
|
||
CHOICE_RE = re.compile(
|
||
r'^\s*choice_\d+\s*:\s*"(?P<label>.*?)"\s*->\s*(?P<target>[A-Za-z0-9_]+)'
|
||
)
|
||
|
||
|
||
# ---- Multi-page patterns & helpers ----
|
||
# ID suffix patterns: foo_p2, foo_part2, foo_final
|
||
SUFFIX_PATTERNS = [
|
||
re.compile(r'^(?P<base>.+)_p\d+$'),
|
||
re.compile(r'^(?P<base>.+)_part\d+$'),
|
||
re.compile(r'^(?P<base>.+)_(?:final)$'),
|
||
]
|
||
|
||
# Title pagination markers like (1/3) or (1/3)
|
||
TITLE_PAGE_RE = re.compile(r'[(\(]\s*\d+\s*/\s*\d+\s*[)\)]')
|
||
|
||
# Edge labels considered as pagination/continuation and can be merged away
|
||
CONTINUE_LABEL_PREFIXES: Tuple[str, ...] = (
|
||
'继续阅读', '继续查看', '继续听', '继续'
|
||
)
|
||
|
||
|
||
def parse_story(path: str) -> Tuple[Dict[str, str], Dict[str, List[Tuple[str, str]]]]:
|
||
"""
|
||
Parse .story file and return:
|
||
- nodes: id -> title
|
||
- edges: src_id -> list of (label, target_id)
|
||
"""
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
nodes: Dict[str, str] = {}
|
||
edges: Dict[str, List[Tuple[str, str]]] = {}
|
||
|
||
current_id: str = ''
|
||
i = 0
|
||
n = len(lines)
|
||
while i < n:
|
||
line = lines[i].rstrip('\n')
|
||
m_node = NODE_RE.match(line)
|
||
if m_node:
|
||
current_id = m_node.group('id')
|
||
# Initialize node if not exists
|
||
if current_id not in nodes:
|
||
nodes[current_id] = current_id
|
||
if current_id not in edges:
|
||
edges[current_id] = []
|
||
|
||
# Scan until next @node or EOF, collecting title and choices
|
||
j = i + 1
|
||
while j < n:
|
||
l2 = lines[j].rstrip('\n')
|
||
# Stop when next node begins
|
||
if NODE_RE.match(l2):
|
||
break
|
||
# Title
|
||
m_title = TITLE_RE.match(l2)
|
||
if m_title:
|
||
title = m_title.group('title').strip()
|
||
if title:
|
||
nodes[current_id] = title
|
||
# Choices -> edges
|
||
m_choice = CHOICE_RE.match(l2)
|
||
if m_choice:
|
||
label = m_choice.group('label').strip()
|
||
target = m_choice.group('target').strip()
|
||
edges.setdefault(current_id, []).append((label, target))
|
||
j += 1
|
||
|
||
# continue from j (next node or EOF)
|
||
i = j
|
||
continue
|
||
i += 1
|
||
|
||
return nodes, edges
|
||
|
||
|
||
def mermaid_escape(text: str) -> str:
|
||
# Replace double quotes to avoid breaking labels
|
||
return text.replace('"', "'")
|
||
|
||
|
||
def extract_base_id(node_id: str) -> str:
|
||
for pat in SUFFIX_PATTERNS:
|
||
m = pat.match(node_id)
|
||
if m:
|
||
return m.group('base')
|
||
return node_id
|
||
|
||
|
||
class UnionFind:
|
||
def __init__(self):
|
||
self.parent: Dict[str, str] = {}
|
||
|
||
def find(self, x: str) -> str:
|
||
if x not in self.parent:
|
||
self.parent[x] = x
|
||
if self.parent[x] != x:
|
||
self.parent[x] = self.find(self.parent[x])
|
||
return self.parent[x]
|
||
|
||
def union(self, a: str, b: str):
|
||
ra, rb = self.find(a), self.find(b)
|
||
if ra != rb:
|
||
self.parent[rb] = ra
|
||
|
||
|
||
def clean_title_for_display(title: str) -> str:
|
||
# Remove pagination markers like (1/3)
|
||
return TITLE_PAGE_RE.sub('', title).strip()
|
||
|
||
|
||
def is_continue_label(label: str) -> bool:
|
||
return any(label.startswith(prefix) for prefix in CONTINUE_LABEL_PREFIXES)
|
||
|
||
|
||
def build_mermaid(nodes: Dict[str, str], edges: Dict[str, List[Tuple[str, str]]], *, root_id: str = 'first_awakening', detach_branches: bool = True) -> str:
|
||
# Collect all referenced targets to add placeholder nodes if missing
|
||
referenced = set()
|
||
for src, lst in edges.items():
|
||
for _label, tgt in lst:
|
||
referenced.add(tgt)
|
||
|
||
# Add placeholders for undefined targets
|
||
for tgt in referenced:
|
||
if tgt not in nodes:
|
||
nodes[tgt] = f"{tgt}"
|
||
edges.setdefault(tgt, [])
|
||
|
||
# Derive incoming labels for placeholder nodes; use edge label as display title if node has no proper title
|
||
incoming_labels: Dict[str, List[str]] = {}
|
||
for src, lst in edges.items():
|
||
for label, tgt in lst:
|
||
if label:
|
||
incoming_labels.setdefault(tgt, []).append(label)
|
||
for node_id, title in list(nodes.items()):
|
||
if title.strip() == node_id.strip():
|
||
labels = incoming_labels.get(node_id, [])
|
||
if labels:
|
||
# pick the shortest non-empty label as node title
|
||
best = sorted((l for l in labels if l.strip()), key=lambda s: len(s))[0]
|
||
nodes[node_id] = best
|
||
|
||
# -------- Merge multi-page nodes and continuation edges --------
|
||
uf = UnionFind()
|
||
|
||
# 1) Merge by suffix patterns (id-based grouping)
|
||
for node_id in list(nodes.keys()):
|
||
base = extract_base_id(node_id)
|
||
if base != node_id:
|
||
uf.union(base, node_id)
|
||
|
||
# 2) Merge by edges labeled as continuation (e.g., 继续阅读)
|
||
for src, lst in edges.items():
|
||
for label, tgt in lst:
|
||
if is_continue_label(label):
|
||
uf.union(src, tgt)
|
||
# 3) Also merge if base ids are identical (pagination chain)
|
||
if extract_base_id(src) == extract_base_id(tgt) and src != tgt:
|
||
uf.union(src, tgt)
|
||
|
||
# Build groups
|
||
groups: Dict[str, List[str]] = {}
|
||
for node_id in nodes.keys():
|
||
rep = uf.find(node_id)
|
||
groups.setdefault(rep, []).append(node_id)
|
||
|
||
# Choose representative id per group (prefer base id of representative)
|
||
group_id_map: Dict[str, str] = {}
|
||
for rep, members in groups.items():
|
||
base_rep = extract_base_id(rep)
|
||
group_id_map[rep] = base_rep
|
||
|
||
# Map every node to its group representative id
|
||
node_to_group: Dict[str, str] = {}
|
||
for rep, members in groups.items():
|
||
gid = group_id_map[rep]
|
||
for m in members:
|
||
node_to_group[m] = gid
|
||
|
||
# Compute group titles (prefer a member title without pagination marker)
|
||
group_titles: Dict[str, str] = {}
|
||
for rep, members in groups.items():
|
||
gid = group_id_map[rep]
|
||
chosen = ''
|
||
# Prefer a member whose title does not contain (x/y)
|
||
for m in members:
|
||
t = nodes.get(m, '').strip()
|
||
if t and not TITLE_PAGE_RE.search(t):
|
||
chosen = t
|
||
break
|
||
if not chosen:
|
||
# Fallback to any non-empty title
|
||
for m in members:
|
||
t = nodes.get(m, '').strip()
|
||
if t:
|
||
chosen = t
|
||
break
|
||
if not chosen:
|
||
# Final fallback: use group id
|
||
chosen = gid
|
||
group_titles[gid] = clean_title_for_display(chosen)
|
||
|
||
# Rebuild edges at group level, dropping pagination edges and self-loops
|
||
new_edges: Set[Tuple[str, str, str]] = set()
|
||
for src, lst in edges.items():
|
||
gsrc = node_to_group[src]
|
||
for label, tgt in lst:
|
||
gtgt = node_to_group[tgt]
|
||
if gsrc == gtgt:
|
||
continue # internal after merge
|
||
if is_continue_label(label):
|
||
continue # drop continuation edges
|
||
elabel = mermaid_escape(label)
|
||
new_edges.add((gsrc, elabel, gtgt))
|
||
|
||
# -------- Redirect loops back to root → to a dedicated restart node --------
|
||
RESTART_NODE_ID = 'restart_cycle_end'
|
||
RESTART_TITLE = '重新开始(进入下一次循环)'
|
||
redirected_edges: Set[Tuple[str, str, str]] = set()
|
||
for (s, l, t) in new_edges:
|
||
if t == root_id:
|
||
redirected_edges.add((s, '重新开始', RESTART_NODE_ID))
|
||
else:
|
||
redirected_edges.add((s, l, t))
|
||
new_edges = redirected_edges
|
||
# Register restart node in titles map
|
||
group_titles[RESTART_NODE_ID] = RESTART_TITLE
|
||
|
||
# Build adjacency on merged graph
|
||
adj: Dict[str, List[Tuple[str, str]]] = {}
|
||
rev: Dict[str, List[Tuple[str, str]]] = {}
|
||
indeg: Dict[str, int] = {gid: 0 for gid in group_titles.keys()}
|
||
outdeg: Dict[str, int] = {gid: 0 for gid in group_titles.keys()}
|
||
for s, l, t in new_edges:
|
||
adj.setdefault(s, []).append((l, t))
|
||
rev.setdefault(t, []).append((l, s))
|
||
outdeg[s] = outdeg.get(s, 0) + 1
|
||
indeg[t] = indeg.get(t, 0) + 1
|
||
|
||
# Reachable set from root
|
||
main_nodes: Set[str] = set()
|
||
if root_id in group_titles:
|
||
q: List[str] = [root_id]
|
||
while q:
|
||
u = q.pop(0)
|
||
if u in main_nodes:
|
||
continue
|
||
main_nodes.add(u)
|
||
for _l, v in adj.get(u, []):
|
||
if v not in main_nodes:
|
||
q.append(v)
|
||
else:
|
||
# Fallback: include all nodes if root not found
|
||
main_nodes = set(group_titles.keys())
|
||
|
||
# Compute depth (shortest hops) from root within reachable graph
|
||
depth: Dict[str, int] = {}
|
||
if root_id in group_titles:
|
||
from collections import deque
|
||
dq = deque()
|
||
dq.append(root_id)
|
||
depth[root_id] = 0
|
||
while dq:
|
||
u = dq.popleft()
|
||
du = depth[u]
|
||
for _l, v in adj.get(u, []):
|
||
if v not in depth:
|
||
depth[v] = du + 1
|
||
dq.append(v)
|
||
|
||
# Identify branch roots (heuristic)
|
||
ENDING_KEYWORDS = (
|
||
'结局', '最终', '终极', '共存', '宽恕', '拯救', '新文明', '宇宙', '学院', '网络', '守护', '转型', '推广'
|
||
)
|
||
def is_branch_title(title: str) -> bool:
|
||
return any(k in title for k in ENDING_KEYWORDS)
|
||
|
||
branch_roots: Set[str] = set()
|
||
if detach_branches:
|
||
# Force-split branch roots by id/title keywords
|
||
FORCED_BRANCH_IDS = {
|
||
'eva_identity_revelation', 'hidden_marks_discovery', 'find_sara'
|
||
}
|
||
FORCED_BRANCH_TITLE_KEYWORDS = (
|
||
'伊娃的身份揭示', '身份揭示', '植入物', '痕迹', '寻找萨拉', '第48次循环的完美策略'
|
||
)
|
||
|
||
for gid, title in group_titles.items():
|
||
if gid == root_id:
|
||
continue
|
||
# candidate if titled like an ending cluster or low indegree/high outdegree leaf-ish
|
||
if is_branch_title(title) and gid in main_nodes:
|
||
branch_roots.add(gid)
|
||
|
||
# Also add nodes with id starting patterns often used for endings
|
||
for gid in list(main_nodes):
|
||
if gid.startswith('ending_') or gid in (
|
||
'coexistence_path', 'coexistence_path_p2',
|
||
'healing_civilization_final', 'harmony_guardians_final',
|
||
'reality_guardians', 'cosmic_guardians_ending', 'universal_rescue_ending'
|
||
):
|
||
branch_roots.add(gid)
|
||
|
||
# Add forced branch roots (by id or title keywords), regardless of heuristics
|
||
for gid, title in group_titles.items():
|
||
if gid in FORCED_BRANCH_IDS:
|
||
branch_roots.add(gid)
|
||
else:
|
||
if any(kw in title for kw in FORCED_BRANCH_TITLE_KEYWORDS):
|
||
branch_roots.add(gid)
|
||
|
||
# Heuristic: deep and weakly-connected nodes become new branch roots
|
||
DEPTH_THRESHOLD = 6
|
||
MAX_EDGES_FROM_MAIN = 1
|
||
MAX_EDGES_TO_MAIN = 1
|
||
for gid in list(main_nodes):
|
||
if gid == root_id:
|
||
continue
|
||
d = depth.get(gid, 0)
|
||
if d < DEPTH_THRESHOLD:
|
||
continue
|
||
in_from_main = sum(1 for _l, s in rev.get(gid, []) if s in main_nodes)
|
||
out_to_main = sum(1 for _l, t in adj.get(gid, []) if t in main_nodes)
|
||
if in_from_main <= MAX_EDGES_FROM_MAIN and out_to_main <= MAX_EDGES_TO_MAIN:
|
||
branch_roots.add(gid)
|
||
|
||
# Grow branch clusters from roots
|
||
branch_clusters: List[Tuple[str, Set[str]]] = [] # (root, nodes)
|
||
used_in_cluster: Set[str] = set()
|
||
for br in sorted(branch_roots):
|
||
if br in used_in_cluster:
|
||
continue
|
||
cluster: Set[str] = set()
|
||
q = [br]
|
||
while q:
|
||
u = q.pop(0)
|
||
if u in cluster:
|
||
continue
|
||
cluster.add(u)
|
||
for _l, v in adj.get(u, []):
|
||
# avoid pulling back too many main nodes: keep expansion to weakly connected area
|
||
in_from_outside = sum(1 for _l, s in rev.get(v, []) if s not in cluster and s in main_nodes)
|
||
if v not in cluster and in_from_outside <= (MAX_EDGES_FROM_MAIN + 1):
|
||
q.append(v)
|
||
if len(cluster) >= 3:
|
||
branch_clusters.append((br, cluster))
|
||
used_in_cluster |= cluster
|
||
|
||
# Remove cluster nodes from main_nodes
|
||
for _br, cset in branch_clusters:
|
||
main_nodes -= cset
|
||
|
||
# Build label augmentation mapping for main nodes that lead to clusters
|
||
jump_tips: Dict[str, List[str]] = {}
|
||
for s, l, t in new_edges:
|
||
# Edge from main to cluster root -> add jump tip
|
||
for br, cset in branch_clusters:
|
||
if s in main_nodes and t in cset:
|
||
tip = f"跳转分支:{group_titles[br]}"
|
||
jump_tips.setdefault(s, [])
|
||
if tip not in jump_tips[s]:
|
||
jump_tips[s].append(tip)
|
||
|
||
# Build Mermaid text
|
||
lines: List[str] = []
|
||
lines.append("%%{init: {'flowchart': {'htmlLabels': true}}}%%")
|
||
lines.append('flowchart TD')
|
||
lines.append('')
|
||
|
||
# Main cluster
|
||
lines.append(' subgraph cluster_main[主线:第一次觉醒]')
|
||
for gid in sorted(main_nodes):
|
||
title = group_titles[gid]
|
||
extra = ''
|
||
if gid in jump_tips:
|
||
extra = '\n' + '、'.join(jump_tips[gid])
|
||
label = mermaid_escape(title + extra)
|
||
lines.append(f' {gid}["{label}"]')
|
||
# Main edges
|
||
for (s, l, t) in sorted(new_edges):
|
||
if s in main_nodes and t in main_nodes:
|
||
if l:
|
||
lines.append(f' {s} -- "{l}" --> {t}')
|
||
else:
|
||
lines.append(f' {s} --> {t}')
|
||
lines.append(' end')
|
||
|
||
# Branch clusters with optional stage subgraphs by depth buckets to avoid overcrowding
|
||
for idx, (br, nodeset) in enumerate(branch_clusters, start=1):
|
||
title = group_titles[br]
|
||
lines.append(f' subgraph cluster_branch_{idx}[分支:{mermaid_escape(title)}]')
|
||
|
||
# Special handling: 二次拆分“伊娃的身份揭示”大分支
|
||
identity_ids = { 'eva_identity_revelation', 'eva_revelation' }
|
||
emo_ids = { 'emotional_reunion', 'memory_sharing', 'identity_exploration', 'rescue_planning' }
|
||
quantum_ids = { 'consciousness_communication', 'consciousness_integration', 'eva_quantum_link' }
|
||
|
||
is_identity_branch = (br in identity_ids) or ('身份' in title and '揭示' in title)
|
||
|
||
if is_identity_branch:
|
||
# Build subsets by id
|
||
subset_identity = {gid for gid in nodeset if gid in identity_ids}
|
||
subset_emo = {gid for gid in nodeset if gid in emo_ids}
|
||
subset_quantum = {gid for gid in nodeset if gid in quantum_ids}
|
||
subset_other = nodeset - subset_identity - subset_emo - subset_quantum
|
||
|
||
# Prepare jump tips within this branch
|
||
intra_jump_tips: Dict[str, List[str]] = {}
|
||
def add_tip(src_gid: str, tip: str):
|
||
intra_jump_tips.setdefault(src_gid, [])
|
||
if tip not in intra_jump_tips[src_gid]:
|
||
intra_jump_tips[src_gid].append(tip)
|
||
|
||
# Print subgraphs
|
||
def print_subset(name: str, subset: Set[str]):
|
||
if not subset:
|
||
return
|
||
lines.append(f' subgraph cluster_branch_{idx}_{name}[{name}]')
|
||
for gid in sorted(subset):
|
||
extra = ''
|
||
if gid in intra_jump_tips:
|
||
extra = '\n' + '、'.join(intra_jump_tips[gid])
|
||
label = mermaid_escape(group_titles[gid] + extra)
|
||
lines.append(f' {gid}["{label}"]')
|
||
# internal edges
|
||
for (s, l, t) in sorted(new_edges):
|
||
if s in subset and t in subset:
|
||
if l:
|
||
lines.append(f' {s} -- "{l}" --> {t}')
|
||
else:
|
||
lines.append(f' {s} --> {t}')
|
||
lines.append(' end')
|
||
|
||
# Collect cross-subset edges to convert into jump tips (从身份主簇指向其他簇)
|
||
for (s, l, t) in sorted(new_edges):
|
||
if s in subset_identity and t in subset_emo:
|
||
add_tip(s, '跳转分支:情感-记忆')
|
||
if s in subset_identity and t in subset_quantum:
|
||
add_tip(s, '跳转分支:意识-量子')
|
||
|
||
# Render subsets
|
||
print_subset('身份揭示主簇', subset_identity or {br})
|
||
print_subset('情感-记忆', subset_emo)
|
||
print_subset('意识-量子', subset_quantum)
|
||
if subset_other:
|
||
print_subset('其他', subset_other)
|
||
|
||
# 不渲染跨子簇边,避免拥挤
|
||
lines.append(' end')
|
||
continue
|
||
|
||
# ---------- 默认:按深度 stage 分桶 ----------
|
||
# compute local depths from branch root
|
||
from collections import deque
|
||
local_depth: Dict[str, int] = {br: 0}
|
||
dq = deque([br])
|
||
while dq:
|
||
u = dq.popleft()
|
||
du = local_depth[u]
|
||
for _l, v in adj.get(u, []):
|
||
if v in nodeset and v not in local_depth:
|
||
local_depth[v] = du + 1
|
||
dq.append(v)
|
||
|
||
# bucket nodes by depth
|
||
buckets: Dict[int, List[str]] = {}
|
||
for n in nodeset:
|
||
d = local_depth.get(n, 0)
|
||
buckets.setdefault(d, []).append(n)
|
||
|
||
for stage in sorted(buckets.keys()):
|
||
lines.append(f' subgraph cluster_branch_{idx}_stage_{stage}[stage {stage}]')
|
||
for gid in sorted(buckets[stage]):
|
||
label = mermaid_escape(group_titles[gid])
|
||
lines.append(f' {gid}["{label}"]')
|
||
# stage internal edges
|
||
for (s, l, t) in sorted(new_edges):
|
||
if s in buckets[stage] and t in buckets[stage]:
|
||
if l:
|
||
lines.append(f' {s} -- "{l}" --> {t}')
|
||
else:
|
||
lines.append(f' {s} --> {t}')
|
||
lines.append(' end')
|
||
|
||
# cross-stage edges within the branch
|
||
for (s, l, t) in sorted(new_edges):
|
||
if s in nodeset and t in nodeset and local_depth.get(s, 0) != local_depth.get(t, 0):
|
||
if l:
|
||
lines.append(f' {s} -- "{l}" --> {t}')
|
||
else:
|
||
lines.append(f' {s} --> {t}')
|
||
|
||
lines.append(' end')
|
||
|
||
# Restart cluster (sink)
|
||
lines.append(f' subgraph cluster_restart[循环重启]')
|
||
lines.append(f' {RESTART_NODE_ID}["{mermaid_escape(RESTART_TITLE)}"]')
|
||
lines.append(' end')
|
||
|
||
lines.append('')
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--input', required=True)
|
||
parser.add_argument('--output', required=True)
|
||
args = parser.parse_args()
|
||
|
||
nodes, edges = parse_story(args.input)
|
||
# Force root as first_awakening; detach branches by default
|
||
content = build_mermaid(nodes, edges, root_id='first_awakening', detach_branches=True)
|
||
|
||
os.makedirs(os.path.dirname(args.output), exist_ok=True)
|
||
with open(args.output, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
print(f"Generated Mermaid with {len(nodes)} nodes and {sum(len(v) for v in edges.values())} edges → {args.output}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
|