s07
双平台仿真
仿真层Twitter + Reddit 并行模拟引擎
子进程架构实现双平台并行,JSONL 日志实时监控使用独立子进程运行 OASIS 仿真脚本,主进程通过监控线程解析 JSONL 日志获取实时状态,实现松耦合架构
Twitter + Reddit 并行模拟引擎
子进程架构实现双平台并行,JSONL 日志实时监控使用独立子进程运行 OASIS 仿真脚本,主进程通过监控线程解析 JSONL 日志获取实时状态,实现松耦合架构
MiroFish 需要同时在 Twitter 和 Reddit 两个模拟平台上运行 OASIS 仿真。主进程是 Flask Web 服务器,不能被仿真阻塞。SimulationRunner 通过子进程架构解耦仿真执行和 Web 服务,用 JSONL 日志文件作为松耦合的数据通道。
子进程架构:Flask 主进程启动 OASIS 脚本作为独立子进程
class SimulationRunner:
_run_states: Dict[str, SimulationRunState] = {}
_processes: Dict[str, subprocess.Popen] = {}
_monitor_threads: Dict[str, threading.Thread] = {}
@classmethod
def start_simulation(cls, simulation_id, ...):
# 启动 Twitter 仿真子进程
twitter_proc = subprocess.Popen(
['python', 'scripts/run_twitter_simulation.py', ...],
stdout=stdout_file, stderr=stderr_file)
# 启动 Reddit 仿真子进程
reddit_proc = subprocess.Popen(
['python', 'scripts/run_reddit_simulation.py', ...],
stdout=stdout_file, stderr=stderr_file)
# 启动监控线程(读取 JSONL 日志)
monitor = threading.Thread(
target=cls._monitor_simulation, args=(simulation_id,))
monitor.start()监控线程:实时解析 JSONL 日志,更新运行状态
@classmethod
def _monitor_simulation(cls, simulation_id):
"""后台线程:读取 actions.jsonl,解析 Agent 动作"""
while state.runner_status == RunnerStatus.RUNNING:
# 读取新增的 JSONL 行
for line in new_lines:
data = json.loads(line)
action = AgentAction(
round_num=data['round'],
platform=data['platform'],
agent_id=data['agent_id'],
agent_name=data['agent_name'],
action_type=data['action_type'],
action_args=data.get('action_args', {}))
state.add_action(action)
# 同时推送到 Zep 记忆更新器
if graph_memory_updater:
graph_memory_updater.add_activity_from_dict(data, platform)
time.sleep(1)