import os import json import logging from pathlib import Path from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError from db import ( Intent, IntentExample, Story, StoryStep, CustomAction, SystemConfig ) from config import config # 初始化日志 logger = logging.getLogger(__name__) class RasaFileGenerator: """Rasa配置文件生成服务,带完整异常处理""" def __init__(self): """初始化生成器,从配置获取输出目录""" try: self.output_dir = config.get("file_storage.rasa_files_dir", "./rasa_files") # 创建输出目录 self._create_directories() logger.info(f"Rasa文件生成器初始化成功,输出目录: {self.output_dir}") except Exception as e: logger.error(f"初始化Rasa文件生成器失败: {str(e)}") raise Exception(f"初始化Rasa文件生成器失败: {str(e)}") def _create_directories(self) -> None: """创建必要的目录结构""" try: Path(self.output_dir).mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/nlu").mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/stories").mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/actions").mkdir(parents=True, exist_ok=True) except OSError as e: raise Exception(f"创建目录失败: {str(e)}") def generate_all_files(self, session: Session) -> dict: """ 生成所有Rasa配置文件 Args: session: 数据库会话 Returns: 生成结果,包含文件路径和状态 """ try: # 生成各文件 domain_path = self.generate_domain_file(session) nlu_path = self.generate_nlu_file(session) story_paths = self.generate_stories_files(session) action_path = self.generate_actions_file(session) return { "success": True, "message": "所有文件生成成功", "output_dir": self.output_dir, "files": { "domain": domain_path, "nlu": nlu_path, "stories": story_paths, "actions": action_path } } except SQLAlchemyError as e: logger.error(f"数据库错误导致文件生成失败: {str(e)}") return { "success": False, "message": f"数据库错误: {str(e)}", } except OSError as e: logger.error(f"文件系统错误导致文件生成失败: {str(e)}") return { "success": False, "message": f"文件系统错误: {str(e)}", } except Exception as e: logger.error(f"文件生成失败: {str(e)}") return { "success": False, "message": f"生成文件时发生错误: {str(e)}", } def generate_domain_file(self, session: Session) -> str: """生成domain.yml文件""" try: domain_data = { "version": "3.1", "intents": [], "slots": {}, "forms": {}, "responses": {}, "actions": [] } # 获取指定版本的意图 intents = session.query(Intent).all() if not intents: logger.warning(f"未找到任何意图") # 添加意图 domain_data["intents"] = [intent.name for intent in intents] # 获取指定版本的自定义动作 actions = session.query(CustomAction).all() # 添加动作 domain_data["actions"] = [action.name for action in actions] # 实际应用中还需要添加槽位、表单和响应 # 写入文件 file_path = f"{self.output_dir}/domain.yml" with open(file_path, "w", encoding="utf-8") as f: self._write_yaml(f, domain_data) logger.info(f"生成domain文件: {file_path}") return file_path except Exception as e: logger.error(f"生成domain文件失败: {str(e)}") raise Exception(f"生成domain文件失败: {str(e)}") def generate_nlu_file(self, session: Session) -> str: """生成nlu.yml文件""" try: nlu_data = { "version": "3.1", "nlu": [] } # 获取指定版本的意图及样本 intents = session.query(Intent).all() for intent in intents: # 获取该意图指定版本的样本 examples = session.query(IntentExample).filter( IntentExample.intent_id == intent.id, ).all() # 构建examples字符串,使用正确的YAML多行文本格式 examples_str = "\n".join([f" - {ex.text}" for ex in examples]) intent_entry = { "intent": intent.name, "examples": examples_str } if intent.description: intent_entry["intent"] = f"{intent.name} # {intent.description}" nlu_data["nlu"].append(intent_entry) # 写入文件 file_path = f"{self.output_dir}/nlu/nlu.yml" with open(file_path, "w", encoding="utf-8") as f: self._write_yaml(f, nlu_data) logger.info(f"生成nlu文件: {file_path}") return file_path except Exception as e: logger.error(f"生成nlu文件失败: {str(e)}") raise Exception(f"生成nlu文件失败: {str(e)}") def generate_stories_files(self, session: Session) -> list[str]: """生成stories文件""" try: files = [] # 获取指定版本的故事 stories = session.query(Story).all() if not stories: logger.warning(f"未找到任何故事") # 生成合并的stories.yml all_stories = { "version": "3.1", "stories": [] } for story in stories: # 获取故事步骤 steps = session.query(StoryStep).filter( StoryStep.story_id == story.id ).order_by(StoryStep.step_order).all() # 构建故事内容 story_entry = { "story": story.name, "steps": [] } if story.description: story_entry["story"] = f"{story.name} # {story.description}" # 处理步骤 for step in steps: content = step.content_dict step_type = step.step_type if step_type == "intent": story_entry["steps"].append({"intent": content.get("name")}) elif step_type == "action": story_entry["steps"].append({"action": content.get("name")}) elif step_type == "form": if content.get("activate", True): story_entry["steps"].append({"action": content.get("name")}) else: story_entry["steps"].append({"action": f"form_deactivate_{content.get('name')}"}) # 处理其他类型的步骤... all_stories["stories"].append(story_entry) # 生成单个故事文件 story_file_name = story.name.lower().replace(" ", "_") + ".yml" story_file_path = f"{self.output_dir}/stories/{story_file_name}" with open(story_file_path, "w", encoding="utf-8") as f: self._write_yaml(f, { "version": "3.1", "stories": [story_entry] }) files.append(story_file_path) # 写入合并的stories.yml merged_file_path = f"{self.output_dir}/stories/stories.yml" with open(merged_file_path, "w", encoding="utf-8") as f: self._write_yaml(f, all_stories) files.append(merged_file_path) logger.info(f"生成stories文件: {len(files)} 个文件") return files except Exception as e: logger.error(f"生成stories文件失败: {str(e)}") raise Exception(f"生成stories文件失败: {str(e)}") def generate_actions_file(self, session: Session) -> str: """生成自定义动作Python文件""" try: actions = session.query(CustomAction).all() if not actions: logger.warning(f"未找到任何自定义动作") # 生成actions.py内容 code = [ "from rasa_sdk import Action, Tracker", "from rasa_sdk.executor import CollectingDispatcher", "from rasa_sdk.events import SlotSet", "import requests", "import json\n" ] for action in actions: class_name = f"Action{''.join(word.capitalize() for word in action.name.split('_'))}" # 生成类定义 code.append(f"class {class_name}(Action):") code.append(f" def name(self) -> str:") code.append(f" return \"{action.name}\"\n") # 生成run方法 code.append(f" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: dict) -> list:") code.append(" # 构建请求头") code.append(" headers = {}") # 添加Token if action.token: code.append(f" headers[\"Authorization\"] = \"Bearer {action.token}\"") # 添加自定义请求头 headers = action.headers_dict for key, value in headers.items(): code.append(f" headers[\"{key}\"] = \"{value}\"") # 添加内容类型 code.append(" headers[\"Content-Type\"] = \"application/json\"\n") # 构建请求体 if action.http_method in ["POST", "PUT"] and action.request_body: code.append(" # 构建请求体") code.append(f" payload = {action.request_body}") # 替换模板变量 code.append(" # 替换模板变量") code.append(" from jinja2 import Template") code.append(" payload_str = json.dumps(payload)") code.append(" template = Template(payload_str)") code.append(" payload = json.loads(template.render(tracker.slots))\n") # 发送请求 code.append(" # 发送请求") code.append(" try:") if action.http_method == "GET": code.append(f" response = requests.get(\"{action.api_url}\", headers=headers)") elif action.http_method == "POST": code.append(f" response = requests.post(\"{action.api_url}\", json=payload, headers=headers)") elif action.http_method == "PUT": code.append(f" response = requests.put(\"{action.api_url}\", json=payload, headers=headers)") elif action.http_method == "DELETE": code.append(f" response = requests.delete(\"{action.api_url}\", headers=headers)") else: code.append(f" dispatcher.utter_message(text=f\"不支持的HTTP方法: {action.http_method}\")") code.append(" return []") # 处理响应 code.append(" if response.status_code == 200:") code.append(" result = response.json()") # 处理响应映射 if action.response_mapping: code.append(" # 处理响应映射") code.append(f" mappings = {action.response_mapping}") code.append(" slot_events = []") code.append(" for slot, path in mappings.items():") code.append(" # 简化的路径解析") code.append(" value = result") code.append(" for part in path.split('.'):") code.append(" if isinstance(value, dict) and part in value:") code.append(" value = value[part]") code.append(" else:") code.append(" value = None") code.append(" break") code.append(" if value is not None:") code.append(" slot_events.append(SlotSet(slot, value))") code.append(" dispatcher.utter_message(text=str(result))") code.append(" return slot_events") else: code.append(" dispatcher.utter_message(text=str(result))") code.append(" else:") code.append(" dispatcher.utter_message(text=f\"API调用失败,状态码: {response.status_code}\")") code.append(" except Exception as e:") code.append(" dispatcher.utter_message(text=f\"调用API时发生错误: {str(e)}\")\n") code.append(" return []\n") # 写入文件 file_path = f"{self.output_dir}/actions/actions.py" with open(file_path, "w", encoding="utf-8") as f: f.write("\n".join(code)) logger.info(f"生成actions文件: {file_path}") return file_path except Exception as e: logger.error(f"生成actions文件失败: {str(e)}") raise Exception(f"生成actions文件失败: {str(e)}") def _write_yaml(self, file, data, indent: int = 0, reset: bool = False) -> None: """ 简单的YAML写入函数 Args: file: 文件对象 data: 要写入的数据 indent: 当前缩进 """ try: indent_str = "" if (indent != 0): indent_str = " " * indent if isinstance(data, dict): for key, value in data.items(): if isinstance(value, (dict, list)): file.write(f"{indent_str}{key}:\n") self._write_yaml(file, value, indent + 1) else: if (key != 'examples'): file.write(f"{key}: {self._format_yaml_value(value)}\n") else: file.write(f"{indent_str}{key}: {self._format_yaml_value(value)}\n") elif isinstance(data, list): for item in data: if isinstance(item, (dict, list)): file.write(f"{indent_str}- ") self._write_yaml(file, item, indent + 1, True) else: file.write(f"{indent_str}- {self._format_yaml_value(item)}\n") else: file.write(f"{self._format_yaml_value(data)}\n") except Exception as e: raise Exception(f"写入YAML数据失败: {str(e)}") def _format_yaml_value(self, value) -> str: """格式化YAML值""" if isinstance(value, str) and (":" in value or "\n" in value): return f"|{chr(10)}{value}" elif isinstance(value, str): return f'{value}' elif isinstance(value, bool): return "true" if value else "false" elif value is None: return "null" else: return str(value)