| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- import os
- import json
- import logging
- from pathlib import Path
- from sqlalchemy.orm import Session
- from sqlalchemy.exc import SQLAlchemyError
- from db import (
- Intent, IntentExample, Story, StoryStep, CustomAction,
- SystemConfig
- )
- from config import config
- # 初始化日志
- logger = logging.getLogger(__name__)
- class RasaFileGenerator:
- """Rasa配置文件生成服务,带完整异常处理"""
-
- def __init__(self):
- """初始化生成器,从配置获取输出目录"""
- try:
- self.output_dir = config.get("file_storage.rasa_files_dir", "./rasa_files")
-
- # 创建输出目录
- self._create_directories()
-
- logger.info(f"Rasa文件生成器初始化成功,输出目录: {self.output_dir}")
-
- except Exception as e:
- logger.error(f"初始化Rasa文件生成器失败: {str(e)}")
- raise Exception(f"初始化Rasa文件生成器失败: {str(e)}")
-
- def _create_directories(self) -> None:
- """创建必要的目录结构"""
- try:
- Path(self.output_dir).mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/nlu").mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/stories").mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/actions").mkdir(parents=True, exist_ok=True)
- except OSError as e:
- raise Exception(f"创建目录失败: {str(e)}")
-
- def generate_all_files(self, session: Session) -> dict:
- """
- 生成所有Rasa配置文件
-
- Args:
- session: 数据库会话
-
- Returns:
- 生成结果,包含文件路径和状态
- """
- try:
-
- # 生成各文件
- domain_path = self.generate_domain_file(session)
- nlu_path = self.generate_nlu_file(session)
- story_paths = self.generate_stories_files(session)
- action_path = self.generate_actions_file(session)
-
-
- return {
- "success": True,
- "message": "所有文件生成成功",
- "output_dir": self.output_dir,
- "files": {
- "domain": domain_path,
- "nlu": nlu_path,
- "stories": story_paths,
- "actions": action_path
- }
- }
- except SQLAlchemyError as e:
- logger.error(f"数据库错误导致文件生成失败: {str(e)}")
- return {
- "success": False,
- "message": f"数据库错误: {str(e)}",
- }
- except OSError as e:
- logger.error(f"文件系统错误导致文件生成失败: {str(e)}")
- return {
- "success": False,
- "message": f"文件系统错误: {str(e)}",
- }
- except Exception as e:
- logger.error(f"文件生成失败: {str(e)}")
- return {
- "success": False,
- "message": f"生成文件时发生错误: {str(e)}",
- }
-
- def generate_domain_file(self, session: Session) -> str:
- """生成domain.yml文件"""
- try:
- domain_data = {
- "version": "3.1",
- "intents": [],
- "slots": {},
- "forms": {},
- "responses": {},
- "actions": []
- }
-
- # 获取指定版本的意图
- intents = session.query(Intent).all()
-
- if not intents:
- logger.warning(f"未找到任何意图")
-
- # 添加意图
- domain_data["intents"] = [intent.name for intent in intents]
-
- # 获取指定版本的自定义动作
- actions = session.query(CustomAction).all()
-
- # 添加动作
- domain_data["actions"] = [action.name for action in actions]
-
- # 实际应用中还需要添加槽位、表单和响应
-
-
- # 写入文件
- file_path = f"{self.output_dir}/domain.yml"
- with open(file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, domain_data)
-
- logger.info(f"生成domain文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成domain文件失败: {str(e)}")
- raise Exception(f"生成domain文件失败: {str(e)}")
-
- def generate_nlu_file(self, session: Session) -> str:
- """生成nlu.yml文件"""
- try:
- nlu_data = {
- "version": "3.1",
- "nlu": []
- }
-
- # 获取指定版本的意图及样本
- intents = session.query(Intent).all()
-
- for intent in intents:
- # 获取该意图指定版本的样本
- examples = session.query(IntentExample).filter(
- IntentExample.intent_id == intent.id,
- ).all()
-
- # 构建examples字符串,使用正确的YAML多行文本格式
- examples_str = "\n".join([f" - {ex.text}" for ex in examples])
-
- intent_entry = {
- "intent": intent.name,
- "examples": examples_str
- }
-
- if intent.description:
- intent_entry["intent"] = f"{intent.name} # {intent.description}"
- nlu_data["nlu"].append(intent_entry)
-
- # 写入文件
- file_path = f"{self.output_dir}/nlu/nlu.yml"
- with open(file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, nlu_data)
-
- logger.info(f"生成nlu文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成nlu文件失败: {str(e)}")
- raise Exception(f"生成nlu文件失败: {str(e)}")
-
- def generate_stories_files(self, session: Session) -> list[str]:
- """生成stories文件"""
- try:
- files = []
-
- # 获取指定版本的故事
- stories = session.query(Story).all()
-
- if not stories:
- logger.warning(f"未找到任何故事")
-
- # 生成合并的stories.yml
- all_stories = {
- "version": "3.1",
- "stories": []
- }
-
- for story in stories:
- # 获取故事步骤
- steps = session.query(StoryStep).filter(
- StoryStep.story_id == story.id
- ).order_by(StoryStep.step_order).all()
-
- # 构建故事内容
- story_entry = {
- "story": story.name,
- "steps": []
- }
-
- if story.description:
- story_entry["story"] = f"{story.name} # {story.description}"
-
- # 处理步骤
- for step in steps:
- content = step.content_dict
- step_type = step.step_type
-
- if step_type == "intent":
- story_entry["steps"].append({"intent": content.get("name")})
- elif step_type == "action":
- story_entry["steps"].append({"action": content.get("name")})
- elif step_type == "form":
- if content.get("activate", True):
- story_entry["steps"].append({"action": content.get("name")})
- else:
- story_entry["steps"].append({"action": f"form_deactivate_{content.get('name')}"})
- # 处理其他类型的步骤...
-
- all_stories["stories"].append(story_entry)
-
- # 生成单个故事文件
- story_file_name = story.name.lower().replace(" ", "_") + ".yml"
- story_file_path = f"{self.output_dir}/stories/{story_file_name}"
-
- with open(story_file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, {
- "version": "3.1",
- "stories": [story_entry]
- })
-
- files.append(story_file_path)
-
- # 写入合并的stories.yml
- merged_file_path = f"{self.output_dir}/stories/stories.yml"
- with open(merged_file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, all_stories)
-
- files.append(merged_file_path)
- logger.info(f"生成stories文件: {len(files)} 个文件")
- return files
-
- except Exception as e:
- logger.error(f"生成stories文件失败: {str(e)}")
- raise Exception(f"生成stories文件失败: {str(e)}")
-
- def generate_actions_file(self, session: Session) -> str:
- """生成自定义动作Python文件"""
- try:
- actions = session.query(CustomAction).all()
-
- if not actions:
- logger.warning(f"未找到任何自定义动作")
-
- # 生成actions.py内容
- code = [
- "from rasa_sdk import Action, Tracker",
- "from rasa_sdk.executor import CollectingDispatcher",
- "from rasa_sdk.events import SlotSet",
- "import requests",
- "import json\n"
- ]
-
- for action in actions:
- class_name = f"Action{''.join(word.capitalize() for word in action.name.split('_'))}"
-
- # 生成类定义
- code.append(f"class {class_name}(Action):")
- code.append(f" def name(self) -> str:")
- code.append(f" return \"{action.name}\"\n")
-
- # 生成run方法
- code.append(f" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: dict) -> list:")
- code.append(" # 构建请求头")
- code.append(" headers = {}")
-
- # 添加Token
- if action.token:
- code.append(f" headers[\"Authorization\"] = \"Bearer {action.token}\"")
-
- # 添加自定义请求头
- headers = action.headers_dict
- for key, value in headers.items():
- code.append(f" headers[\"{key}\"] = \"{value}\"")
-
- # 添加内容类型
- code.append(" headers[\"Content-Type\"] = \"application/json\"\n")
-
- # 构建请求体
- if action.http_method in ["POST", "PUT"] and action.request_body:
- code.append(" # 构建请求体")
- code.append(f" payload = {action.request_body}")
-
- # 替换模板变量
- code.append(" # 替换模板变量")
- code.append(" from jinja2 import Template")
- code.append(" payload_str = json.dumps(payload)")
- code.append(" template = Template(payload_str)")
- code.append(" payload = json.loads(template.render(tracker.slots))\n")
-
- # 发送请求
- code.append(" # 发送请求")
- code.append(" try:")
-
- if action.http_method == "GET":
- code.append(f" response = requests.get(\"{action.api_url}\", headers=headers)")
- elif action.http_method == "POST":
- code.append(f" response = requests.post(\"{action.api_url}\", json=payload, headers=headers)")
- elif action.http_method == "PUT":
- code.append(f" response = requests.put(\"{action.api_url}\", json=payload, headers=headers)")
- elif action.http_method == "DELETE":
- code.append(f" response = requests.delete(\"{action.api_url}\", headers=headers)")
- else:
- code.append(f" dispatcher.utter_message(text=f\"不支持的HTTP方法: {action.http_method}\")")
- code.append(" return []")
-
- # 处理响应
- code.append(" if response.status_code == 200:")
- code.append(" result = response.json()")
-
- # 处理响应映射
- if action.response_mapping:
- code.append(" # 处理响应映射")
- code.append(f" mappings = {action.response_mapping}")
- code.append(" slot_events = []")
- code.append(" for slot, path in mappings.items():")
- code.append(" # 简化的路径解析")
- code.append(" value = result")
- code.append(" for part in path.split('.'):")
- code.append(" if isinstance(value, dict) and part in value:")
- code.append(" value = value[part]")
- code.append(" else:")
- code.append(" value = None")
- code.append(" break")
- code.append(" if value is not None:")
- code.append(" slot_events.append(SlotSet(slot, value))")
- code.append(" dispatcher.utter_message(text=str(result))")
- code.append(" return slot_events")
- else:
- code.append(" dispatcher.utter_message(text=str(result))")
-
- code.append(" else:")
- code.append(" dispatcher.utter_message(text=f\"API调用失败,状态码: {response.status_code}\")")
- code.append(" except Exception as e:")
- code.append(" dispatcher.utter_message(text=f\"调用API时发生错误: {str(e)}\")\n")
- code.append(" return []\n")
-
- # 写入文件
- file_path = f"{self.output_dir}/actions/actions.py"
- with open(file_path, "w", encoding="utf-8") as f:
- f.write("\n".join(code))
-
- logger.info(f"生成actions文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成actions文件失败: {str(e)}")
- raise Exception(f"生成actions文件失败: {str(e)}")
-
- def _write_yaml(self, file, data, indent: int = 0, reset: bool = False) -> None:
- """
- 简单的YAML写入函数
-
- Args:
- file: 文件对象
- data: 要写入的数据
- indent: 当前缩进
- """
- try:
- indent_str = ""
- if (indent != 0):
- indent_str = " " * indent
-
- if isinstance(data, dict):
- for key, value in data.items():
- if isinstance(value, (dict, list)):
- file.write(f"{indent_str}{key}:\n")
- self._write_yaml(file, value, indent + 1)
- else:
- if (key != 'examples'):
- file.write(f"{key}: {self._format_yaml_value(value)}\n")
- else:
- file.write(f"{indent_str}{key}: {self._format_yaml_value(value)}\n")
-
- elif isinstance(data, list):
- for item in data:
- if isinstance(item, (dict, list)):
- file.write(f"{indent_str}- ")
- self._write_yaml(file, item, indent + 1, True)
- else:
- file.write(f"{indent_str}- {self._format_yaml_value(item)}\n")
- else:
- file.write(f"{self._format_yaml_value(data)}\n")
- except Exception as e:
- raise Exception(f"写入YAML数据失败: {str(e)}")
-
- def _format_yaml_value(self, value) -> str:
- """格式化YAML值"""
- if isinstance(value, str) and (":" in value or "\n" in value):
- return f"|{chr(10)}{value}"
- elif isinstance(value, str):
- return f'{value}'
- elif isinstance(value, bool):
- return "true" if value else "false"
- elif value is None:
- return "null"
- else:
- return str(value)
|