添加DSL引擎迁移和验证功能

- 新增最终验证脚本和迁移测试脚本,确保DSL引擎的完整性和功能
- 实现故事模块的音频配置、角色定义和情感故事模块
- 迁移现有故事数据到新的DSL格式,支持动态内容和条件导航
- 更新主题和UI组件,确保一致的黑色背景和暗色主题
- 添加音频管理器和性能监控工具,提升游戏体验和调试能力
This commit is contained in:
2025-09-10 17:36:55 +08:00
parent a90cc1c5a9
commit 93400900c0
9 changed files with 6938 additions and 1202 deletions

View File

@@ -0,0 +1,536 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import os
import re
from typing import Dict, List, Tuple, Set
NODE_RE = re.compile(r'^\s*@node\s+(?P<id>[A-Za-z0-9_]+)\s*$')
TITLE_RE = re.compile(r'^\s*@title\s+"(?P<title>.*)"\s*$')
CHOICE_RE = re.compile(
r'^\s*choice_\d+\s*:\s*"(?P<label>.*?)"\s*->\s*(?P<target>[A-Za-z0-9_]+)'
)
# ---- Multi-page patterns & helpers ----
# ID suffix patterns: foo_p2, foo_part2, foo_final
SUFFIX_PATTERNS = [
re.compile(r'^(?P<base>.+)_p\d+$'),
re.compile(r'^(?P<base>.+)_part\d+$'),
re.compile(r'^(?P<base>.+)_(?:final)$'),
]
# Title pagination markers like 1/3 or (1/3)
TITLE_PAGE_RE = re.compile(r'[\(]\s*\d+\s*/\s*\d+\s*[\)]')
# Edge labels considered as pagination/continuation and can be merged away
CONTINUE_LABEL_PREFIXES: Tuple[str, ...] = (
'继续阅读', '继续查看', '继续听', '继续'
)
def parse_story(path: str) -> Tuple[Dict[str, str], Dict[str, List[Tuple[str, str]]]]:
"""
Parse .story file and return:
- nodes: id -> title
- edges: src_id -> list of (label, target_id)
"""
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()
nodes: Dict[str, str] = {}
edges: Dict[str, List[Tuple[str, str]]] = {}
current_id: str = ''
i = 0
n = len(lines)
while i < n:
line = lines[i].rstrip('\n')
m_node = NODE_RE.match(line)
if m_node:
current_id = m_node.group('id')
# Initialize node if not exists
if current_id not in nodes:
nodes[current_id] = current_id
if current_id not in edges:
edges[current_id] = []
# Scan until next @node or EOF, collecting title and choices
j = i + 1
while j < n:
l2 = lines[j].rstrip('\n')
# Stop when next node begins
if NODE_RE.match(l2):
break
# Title
m_title = TITLE_RE.match(l2)
if m_title:
title = m_title.group('title').strip()
if title:
nodes[current_id] = title
# Choices -> edges
m_choice = CHOICE_RE.match(l2)
if m_choice:
label = m_choice.group('label').strip()
target = m_choice.group('target').strip()
edges.setdefault(current_id, []).append((label, target))
j += 1
# continue from j (next node or EOF)
i = j
continue
i += 1
return nodes, edges
def mermaid_escape(text: str) -> str:
# Replace double quotes to avoid breaking labels
return text.replace('"', "'")
def extract_base_id(node_id: str) -> str:
for pat in SUFFIX_PATTERNS:
m = pat.match(node_id)
if m:
return m.group('base')
return node_id
class UnionFind:
def __init__(self):
self.parent: Dict[str, str] = {}
def find(self, x: str) -> str:
if x not in self.parent:
self.parent[x] = x
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, a: str, b: str):
ra, rb = self.find(a), self.find(b)
if ra != rb:
self.parent[rb] = ra
def clean_title_for_display(title: str) -> str:
# Remove pagination markers like 1/3
return TITLE_PAGE_RE.sub('', title).strip()
def is_continue_label(label: str) -> bool:
return any(label.startswith(prefix) for prefix in CONTINUE_LABEL_PREFIXES)
def build_mermaid(nodes: Dict[str, str], edges: Dict[str, List[Tuple[str, str]]], *, root_id: str = 'first_awakening', detach_branches: bool = True) -> str:
# Collect all referenced targets to add placeholder nodes if missing
referenced = set()
for src, lst in edges.items():
for _label, tgt in lst:
referenced.add(tgt)
# Add placeholders for undefined targets
for tgt in referenced:
if tgt not in nodes:
nodes[tgt] = f"{tgt}"
edges.setdefault(tgt, [])
# Derive incoming labels for placeholder nodes; use edge label as display title if node has no proper title
incoming_labels: Dict[str, List[str]] = {}
for src, lst in edges.items():
for label, tgt in lst:
if label:
incoming_labels.setdefault(tgt, []).append(label)
for node_id, title in list(nodes.items()):
if title.strip() == node_id.strip():
labels = incoming_labels.get(node_id, [])
if labels:
# pick the shortest non-empty label as node title
best = sorted((l for l in labels if l.strip()), key=lambda s: len(s))[0]
nodes[node_id] = best
# -------- Merge multi-page nodes and continuation edges --------
uf = UnionFind()
# 1) Merge by suffix patterns (id-based grouping)
for node_id in list(nodes.keys()):
base = extract_base_id(node_id)
if base != node_id:
uf.union(base, node_id)
# 2) Merge by edges labeled as continuation (e.g., 继续阅读)
for src, lst in edges.items():
for label, tgt in lst:
if is_continue_label(label):
uf.union(src, tgt)
# 3) Also merge if base ids are identical (pagination chain)
if extract_base_id(src) == extract_base_id(tgt) and src != tgt:
uf.union(src, tgt)
# Build groups
groups: Dict[str, List[str]] = {}
for node_id in nodes.keys():
rep = uf.find(node_id)
groups.setdefault(rep, []).append(node_id)
# Choose representative id per group (prefer base id of representative)
group_id_map: Dict[str, str] = {}
for rep, members in groups.items():
base_rep = extract_base_id(rep)
group_id_map[rep] = base_rep
# Map every node to its group representative id
node_to_group: Dict[str, str] = {}
for rep, members in groups.items():
gid = group_id_map[rep]
for m in members:
node_to_group[m] = gid
# Compute group titles (prefer a member title without pagination marker)
group_titles: Dict[str, str] = {}
for rep, members in groups.items():
gid = group_id_map[rep]
chosen = ''
# Prefer a member whose title does not contain (x/y)
for m in members:
t = nodes.get(m, '').strip()
if t and not TITLE_PAGE_RE.search(t):
chosen = t
break
if not chosen:
# Fallback to any non-empty title
for m in members:
t = nodes.get(m, '').strip()
if t:
chosen = t
break
if not chosen:
# Final fallback: use group id
chosen = gid
group_titles[gid] = clean_title_for_display(chosen)
# Rebuild edges at group level, dropping pagination edges and self-loops
new_edges: Set[Tuple[str, str, str]] = set()
for src, lst in edges.items():
gsrc = node_to_group[src]
for label, tgt in lst:
gtgt = node_to_group[tgt]
if gsrc == gtgt:
continue # internal after merge
if is_continue_label(label):
continue # drop continuation edges
elabel = mermaid_escape(label)
new_edges.add((gsrc, elabel, gtgt))
# -------- Redirect loops back to root → to a dedicated restart node --------
RESTART_NODE_ID = 'restart_cycle_end'
RESTART_TITLE = '重新开始(进入下一次循环)'
redirected_edges: Set[Tuple[str, str, str]] = set()
for (s, l, t) in new_edges:
if t == root_id:
redirected_edges.add((s, '重新开始', RESTART_NODE_ID))
else:
redirected_edges.add((s, l, t))
new_edges = redirected_edges
# Register restart node in titles map
group_titles[RESTART_NODE_ID] = RESTART_TITLE
# Build adjacency on merged graph
adj: Dict[str, List[Tuple[str, str]]] = {}
rev: Dict[str, List[Tuple[str, str]]] = {}
indeg: Dict[str, int] = {gid: 0 for gid in group_titles.keys()}
outdeg: Dict[str, int] = {gid: 0 for gid in group_titles.keys()}
for s, l, t in new_edges:
adj.setdefault(s, []).append((l, t))
rev.setdefault(t, []).append((l, s))
outdeg[s] = outdeg.get(s, 0) + 1
indeg[t] = indeg.get(t, 0) + 1
# Reachable set from root
main_nodes: Set[str] = set()
if root_id in group_titles:
q: List[str] = [root_id]
while q:
u = q.pop(0)
if u in main_nodes:
continue
main_nodes.add(u)
for _l, v in adj.get(u, []):
if v not in main_nodes:
q.append(v)
else:
# Fallback: include all nodes if root not found
main_nodes = set(group_titles.keys())
# Compute depth (shortest hops) from root within reachable graph
depth: Dict[str, int] = {}
if root_id in group_titles:
from collections import deque
dq = deque()
dq.append(root_id)
depth[root_id] = 0
while dq:
u = dq.popleft()
du = depth[u]
for _l, v in adj.get(u, []):
if v not in depth:
depth[v] = du + 1
dq.append(v)
# Identify branch roots (heuristic)
ENDING_KEYWORDS = (
'结局', '最终', '终极', '共存', '宽恕', '拯救', '新文明', '宇宙', '学院', '网络', '守护', '转型', '推广'
)
def is_branch_title(title: str) -> bool:
return any(k in title for k in ENDING_KEYWORDS)
branch_roots: Set[str] = set()
if detach_branches:
# Force-split branch roots by id/title keywords
FORCED_BRANCH_IDS = {
'eva_identity_revelation', 'hidden_marks_discovery', 'find_sara'
}
FORCED_BRANCH_TITLE_KEYWORDS = (
'伊娃的身份揭示', '身份揭示', '植入物', '痕迹', '寻找萨拉', '第48次循环的完美策略'
)
for gid, title in group_titles.items():
if gid == root_id:
continue
# candidate if titled like an ending cluster or low indegree/high outdegree leaf-ish
if is_branch_title(title) and gid in main_nodes:
branch_roots.add(gid)
# Also add nodes with id starting patterns often used for endings
for gid in list(main_nodes):
if gid.startswith('ending_') or gid in (
'coexistence_path', 'coexistence_path_p2',
'healing_civilization_final', 'harmony_guardians_final',
'reality_guardians', 'cosmic_guardians_ending', 'universal_rescue_ending'
):
branch_roots.add(gid)
# Add forced branch roots (by id or title keywords), regardless of heuristics
for gid, title in group_titles.items():
if gid in FORCED_BRANCH_IDS:
branch_roots.add(gid)
else:
if any(kw in title for kw in FORCED_BRANCH_TITLE_KEYWORDS):
branch_roots.add(gid)
# Heuristic: deep and weakly-connected nodes become new branch roots
DEPTH_THRESHOLD = 6
MAX_EDGES_FROM_MAIN = 1
MAX_EDGES_TO_MAIN = 1
for gid in list(main_nodes):
if gid == root_id:
continue
d = depth.get(gid, 0)
if d < DEPTH_THRESHOLD:
continue
in_from_main = sum(1 for _l, s in rev.get(gid, []) if s in main_nodes)
out_to_main = sum(1 for _l, t in adj.get(gid, []) if t in main_nodes)
if in_from_main <= MAX_EDGES_FROM_MAIN and out_to_main <= MAX_EDGES_TO_MAIN:
branch_roots.add(gid)
# Grow branch clusters from roots
branch_clusters: List[Tuple[str, Set[str]]] = [] # (root, nodes)
used_in_cluster: Set[str] = set()
for br in sorted(branch_roots):
if br in used_in_cluster:
continue
cluster: Set[str] = set()
q = [br]
while q:
u = q.pop(0)
if u in cluster:
continue
cluster.add(u)
for _l, v in adj.get(u, []):
# avoid pulling back too many main nodes: keep expansion to weakly connected area
in_from_outside = sum(1 for _l, s in rev.get(v, []) if s not in cluster and s in main_nodes)
if v not in cluster and in_from_outside <= (MAX_EDGES_FROM_MAIN + 1):
q.append(v)
if len(cluster) >= 3:
branch_clusters.append((br, cluster))
used_in_cluster |= cluster
# Remove cluster nodes from main_nodes
for _br, cset in branch_clusters:
main_nodes -= cset
# Build label augmentation mapping for main nodes that lead to clusters
jump_tips: Dict[str, List[str]] = {}
for s, l, t in new_edges:
# Edge from main to cluster root -> add jump tip
for br, cset in branch_clusters:
if s in main_nodes and t in cset:
tip = f"跳转分支:{group_titles[br]}"
jump_tips.setdefault(s, [])
if tip not in jump_tips[s]:
jump_tips[s].append(tip)
# Build Mermaid text
lines: List[str] = []
lines.append("%%{init: {'flowchart': {'htmlLabels': true}}}%%")
lines.append('flowchart TD')
lines.append('')
# Main cluster
lines.append(' subgraph cluster_main[主线:第一次觉醒]')
for gid in sorted(main_nodes):
title = group_titles[gid]
extra = ''
if gid in jump_tips:
extra = '\n' + ''.join(jump_tips[gid])
label = mermaid_escape(title + extra)
lines.append(f' {gid}["{label}"]')
# Main edges
for (s, l, t) in sorted(new_edges):
if s in main_nodes and t in main_nodes:
if l:
lines.append(f' {s} -- "{l}" --> {t}')
else:
lines.append(f' {s} --> {t}')
lines.append(' end')
# Branch clusters with optional stage subgraphs by depth buckets to avoid overcrowding
for idx, (br, nodeset) in enumerate(branch_clusters, start=1):
title = group_titles[br]
lines.append(f' subgraph cluster_branch_{idx}[分支:{mermaid_escape(title)}]')
# Special handling: 二次拆分“伊娃的身份揭示”大分支
identity_ids = { 'eva_identity_revelation', 'eva_revelation' }
emo_ids = { 'emotional_reunion', 'memory_sharing', 'identity_exploration', 'rescue_planning' }
quantum_ids = { 'consciousness_communication', 'consciousness_integration', 'eva_quantum_link' }
is_identity_branch = (br in identity_ids) or ('身份' in title and '揭示' in title)
if is_identity_branch:
# Build subsets by id
subset_identity = {gid for gid in nodeset if gid in identity_ids}
subset_emo = {gid for gid in nodeset if gid in emo_ids}
subset_quantum = {gid for gid in nodeset if gid in quantum_ids}
subset_other = nodeset - subset_identity - subset_emo - subset_quantum
# Prepare jump tips within this branch
intra_jump_tips: Dict[str, List[str]] = {}
def add_tip(src_gid: str, tip: str):
intra_jump_tips.setdefault(src_gid, [])
if tip not in intra_jump_tips[src_gid]:
intra_jump_tips[src_gid].append(tip)
# Print subgraphs
def print_subset(name: str, subset: Set[str]):
if not subset:
return
lines.append(f' subgraph cluster_branch_{idx}_{name}[{name}]')
for gid in sorted(subset):
extra = ''
if gid in intra_jump_tips:
extra = '\n' + ''.join(intra_jump_tips[gid])
label = mermaid_escape(group_titles[gid] + extra)
lines.append(f' {gid}["{label}"]')
# internal edges
for (s, l, t) in sorted(new_edges):
if s in subset and t in subset:
if l:
lines.append(f' {s} -- "{l}" --> {t}')
else:
lines.append(f' {s} --> {t}')
lines.append(' end')
# Collect cross-subset edges to convert into jump tips (从身份主簇指向其他簇)
for (s, l, t) in sorted(new_edges):
if s in subset_identity and t in subset_emo:
add_tip(s, '跳转分支:情感-记忆')
if s in subset_identity and t in subset_quantum:
add_tip(s, '跳转分支:意识-量子')
# Render subsets
print_subset('身份揭示主簇', subset_identity or {br})
print_subset('情感-记忆', subset_emo)
print_subset('意识-量子', subset_quantum)
if subset_other:
print_subset('其他', subset_other)
# 不渲染跨子簇边,避免拥挤
lines.append(' end')
continue
# ---------- 默认:按深度 stage 分桶 ----------
# compute local depths from branch root
from collections import deque
local_depth: Dict[str, int] = {br: 0}
dq = deque([br])
while dq:
u = dq.popleft()
du = local_depth[u]
for _l, v in adj.get(u, []):
if v in nodeset and v not in local_depth:
local_depth[v] = du + 1
dq.append(v)
# bucket nodes by depth
buckets: Dict[int, List[str]] = {}
for n in nodeset:
d = local_depth.get(n, 0)
buckets.setdefault(d, []).append(n)
for stage in sorted(buckets.keys()):
lines.append(f' subgraph cluster_branch_{idx}_stage_{stage}[stage {stage}]')
for gid in sorted(buckets[stage]):
label = mermaid_escape(group_titles[gid])
lines.append(f' {gid}["{label}"]')
# stage internal edges
for (s, l, t) in sorted(new_edges):
if s in buckets[stage] and t in buckets[stage]:
if l:
lines.append(f' {s} -- "{l}" --> {t}')
else:
lines.append(f' {s} --> {t}')
lines.append(' end')
# cross-stage edges within the branch
for (s, l, t) in sorted(new_edges):
if s in nodeset and t in nodeset and local_depth.get(s, 0) != local_depth.get(t, 0):
if l:
lines.append(f' {s} -- "{l}" --> {t}')
else:
lines.append(f' {s} --> {t}')
lines.append(' end')
# Restart cluster (sink)
lines.append(f' subgraph cluster_restart[循环重启]')
lines.append(f' {RESTART_NODE_ID}["{mermaid_escape(RESTART_TITLE)}"]')
lines.append(' end')
lines.append('')
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True)
parser.add_argument('--output', required=True)
args = parser.parse_args()
nodes, edges = parse_story(args.input)
# Force root as first_awakening; detach branches by default
content = build_mermaid(nodes, edges, root_id='first_awakening', detach_branches=True)
os.makedirs(os.path.dirname(args.output), exist_ok=True)
with open(args.output, 'w', encoding='utf-8') as f:
f.write(content)
print(f"Generated Mermaid with {len(nodes)} nodes and {sum(len(v) for v in edges.values())} edges → {args.output}")
if __name__ == '__main__':
main()