| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- import os
- import sys
- import json
- import logging
- from pathlib import Path
- # 添加项目根目录到Python搜索路径
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
- from config import Config
- # 初始化日志
- logger = logging.getLogger(__name__)
- class RasaFileGenerator:
- """Rasa配置文件生成服务,带完整异常处理"""
-
- def __init__(self):
- """初始化生成器,从配置获取输出目录"""
- try:
- # 创建配置实例
- self.config = Config()
- self.output_dir = self.config.get("file_storage.rasa_files_dir", "./rasa_files")
-
- # 创建输出目录
- self._create_directories()
-
- logger.info(f"Rasa文件生成器初始化成功,输出目录: {self.output_dir}")
-
- except Exception as e:
- logger.error(f"初始化Rasa文件生成器失败: {str(e)}")
- raise Exception(f"初始化Rasa文件生成器失败: {str(e)}")
-
- def _load_flow_json(self, flow_json_path: str) -> dict:
- """加载并解析flow.json文件"""
- try:
- with open(flow_json_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- raise Exception(f"flow.json文件未找到: {flow_json_path}")
- except json.JSONDecodeError:
- raise Exception(f"flow.json文件格式无效")
- except Exception as e:
- raise Exception(f"加载flow.json文件失败: {str(e)}")
-
- def _create_directories(self) -> None:
- """创建必要的目录结构"""
- try:
- Path(self.output_dir).mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/nlu").mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/stories").mkdir(parents=True, exist_ok=True)
- Path(f"{self.output_dir}/actions").mkdir(parents=True, exist_ok=True)
- except OSError as e:
- raise Exception(f"创建目录失败: {str(e)}")
-
- def generate_all_files(self, flow_json_str: str, flow_name: str) -> dict:
- """
- 生成所有Rasa配置文件
-
- Args:
- flow_json_str: flow数据的JSON字符串
-
- Returns:
- 生成结果,包含文件路径和状态
- """
- try:
- # 解析flow JSON字符串
- flow_data = json.loads(flow_json_str)
-
- # 生成各文件
- domain_path = self.generate_domain_file(flow_data)
- nlu_path = self.generate_nlu_file(flow_data)
- story_paths = self.generate_stories_files(flow_data, flow_name)
- action_path = self.generate_actions_file(flow_data)
-
- return {
- "success": True,
- "message": "所有文件生成成功",
- "output_dir": self.output_dir,
- "files": {
- "domain": domain_path,
- "nlu": nlu_path,
- "stories": story_paths,
- "actions": action_path
- }
- }
- except OSError as e:
- logger.error(f"文件系统错误导致文件生成失败: {str(e)}")
- return {
- "success": False,
- "message": f"文件系统错误: {str(e)}",
- }
- except Exception as e:
- logger.error(f"文件生成失败: {str(e)}")
- return {
- "success": False,
- "message": f"生成文件时发生错误: {str(e)}",
- }
-
- def generate_domain_file(self, flow_data: dict) -> str:
- """生成domain.yml文件"""
- try:
- domain_data = {
- "version": "3.1",
- "intents": [],
- "slots": {},
- "forms": {},
- "responses": {},
- "actions": []
- }
-
- # 从flow_data中提取意图和动作
- nodes = flow_data.get("flowJson", {}).get("nodes", [])
- intents = []
- actions = []
- forms = []
-
- for node in nodes:
- node_type = node.get("type")
- properties = node.get("properties", {})
- code = properties.get("code")
-
- if node_type == "intention" and code:
- intents.append(code)
- elif node_type == "action" and code:
- actions.append(code)
- elif node_type == "form" and code:
- actions.append(code)
- elif node_type == "collection" and code:
- forms.append(code)
- actions.append(f"form_{code}")
-
- # 添加意图
- domain_data["intents"] = intents
-
- # 添加动作
- domain_data["actions"] = actions
-
- # 添加表单
- for form_code in forms:
- domain_data["forms"][form_code] = {}
-
- # 添加槽位 - 从表单节点提取
- for node in nodes:
- if node.get("type") == "collection":
- form_code = node.get("properties", {}).get("code")
- form_fields = node.get("properties", {}).get("formFields", [])
- for field in form_fields:
- slot_name = field.get("slotName") or f"{form_code}_{field.get('entityType')}"
- domain_data["slots"][slot_name] = {
- "type": "text" # 默认为text类型,可根据需要调整
- }
-
- # 写入文件
- file_path = f"{self.output_dir}/domain.yml"
- with open(file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, domain_data)
-
- logger.info(f"生成domain文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成domain文件失败: {str(e)}")
- raise Exception(f"生成domain文件失败: {str(e)}")
-
- def generate_nlu_file(self, flow_data: dict) -> str:
- """生成nlu.yml文件"""
- try:
- nlu_data = {
- "version": "3.1",
- "nlu": []
- }
-
- # 从flow_data中提取意图及样本
- nodes = flow_data.get("flowJson", {}).get("nodes", [])
-
- for node in nodes:
- if node.get("type") == "intention":
- properties = node.get("properties", {})
- intent_name = properties.get("code")
- intent_desc = properties.get("desc")
- samples = properties.get("samples", [])
-
- if not intent_name:
- continue
- # {
- # "text": "我想定明天的酒店",
- # "entities": [
- # {
- # "text": "明天",
- # "label": "DATE",
- # "start": 3,
- # "end": 4
- # }
- # ]
- # },
- # entities = samples.get("entities", [])
- # 如果entities存在说明需要在样本中标记实体
- # 处理实体并构建examples字符串
- def format_sample_with_entities(sample):
- text = sample.get('text', '')
- entities = sample.get('entities', [])
-
- # 没有实体,直接返回文本
- if not entities:
- return text
-
- # 按start位置降序排列实体,确保从后向前处理,避免替换位置偏移
- sorted_entities = sorted(entities, key=lambda e: e.get('start', 0), reverse=True)
-
- for entity in sorted_entities:
- start = entity.get('start', 0)
- end = entity.get('end', 0) + 1 # 因为end是 exclusive 的
- entity_text = text[start:end]
- label = entity.get('label', 'UNKNOWN')
-
- # 替换文本中的实体部分
- text = text[:start] + f"[{entity_text}]({label})" + text[end:]
-
- return text
-
- examples_str = "\n".join([f" - {format_sample_with_entities(sample)}" for sample in samples])
-
- intent_entry = {
- "intent": intent_name,
- "examples": examples_str
- }
-
- if intent_desc:
- intent_entry["intent"] = f"{intent_name} # {intent_desc}"
-
- nlu_data["nlu"].append(intent_entry)
-
- # 写入文件
- file_path = f"{self.output_dir}/nlu/nlu.yml"
- with open(file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, nlu_data)
-
- logger.info(f"生成nlu文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成nlu文件失败: {str(e)}")
- raise Exception(f"生成nlu文件失败: {str(e)}")
-
- def generate_stories_files(self, flow_data: dict, flow_name: str) -> list[str]:
- """生成stories文件"""
- try:
- files = []
-
- # 从flow_data中提取故事信息
- # flow_name = flow_data.get("flowName", "default_flow")
- nodes = flow_data.get("flowJson", {}).get("nodes", [])
- edges = flow_data.get("flowJson", {}).get("edges", [])
-
- # 构建节点ID到节点的映射
- node_map = {node.get("id"): node for node in nodes}
-
- # 构建边的映射 (source -> target)
- edge_map = {}
- for edge in edges:
- source = edge.get("sourceNodeId")
- target = edge.get("targetNodeId")
- if source not in edge_map:
- edge_map[source] = []
- edge_map[source].append(target)
-
- # 找到开始节点
- start_node = None
- for node in nodes:
- if node.get("type") == "start":
- start_node = node
- break
-
- if not start_node:
- logger.warning("未找到开始节点")
- return files
-
- # 构建故事步骤
- story_steps = []
- current_node = start_node
-
- while current_node:
- node_type = current_node.get("type")
- properties = current_node.get("properties", {})
- code = properties.get("code")
-
- if node_type == "intention" and code:
- story_steps.append({"intent": code})
- elif node_type == "action" and code:
- story_steps.append({"action": code})
- elif node_type == "collection" and code:
- story_steps.append({"action": f"form_{code}"})
- # 添加表单激活后的槽位填充
- story_steps.append({"active_loop": {"name": code}})
- story_steps.append({"active_loop": None})
- elif node_type == "form" and code:
- story_steps.append({"action": code})
- elif node_type == "condition" and code:
- # 简化处理条件节点
- story_steps.append({"action": code})
- # 这里应该根据条件分支生成不同的故事路径,但为简化起见,我们只取第一条路径
-
- # 找到下一个节点
- current_node_id = current_node.get("id")
- next_nodes = edge_map.get(current_node_id, [])
-
- if not next_nodes or node_type == "end":
- break
-
- # 简单处理,只取第一个下一个节点
- current_node = node_map.get(next_nodes[0])
-
- # 构建故事内容
- story_entry = {
- "story": flow_name,
- "steps": story_steps
- }
-
- # 生成合并的stories.yml
- all_stories = {
- "version": "3.1",
- "stories": [story_entry]
- }
-
- # 写入合并的stories.yml
- merged_file_path = f"{self.output_dir}/stories/stories.yml"
- with open(merged_file_path, "w", encoding="utf-8") as f:
- self._write_yaml(f, all_stories)
-
- files.append(merged_file_path)
- logger.info(f"生成stories文件: {len(files)} 个文件")
- return files
-
- except Exception as e:
- logger.error(f"生成stories文件失败: {str(e)}")
- raise Exception(f"生成stories文件失败: {str(e)}")
-
- def generate_actions_file(self, flow_data: dict) -> str:
- """生成自定义动作Python文件"""
- try:
- # 从flow_data中提取动作信息
- nodes = flow_data.get("flowJson", {}).get("nodes", [])
- actions = []
-
- for node in nodes:
- node_type = node.get("type")
- if node_type == "action" or node_type == "form" or node_type == "condition":
- properties = node.get("properties", {})
- if properties.get("code"):
- actions.append({
- "type": node_type,
- "properties": properties
- })
-
- if not actions:
- logger.warning(f"未找到任何自定义动作")
-
- # 生成actions.py内容
- code = [
- "from rasa_sdk import Action, Tracker",
- "from rasa_sdk.executor import CollectingDispatcher",
- "from rasa_sdk.events import SlotSet, ActiveLoop",
- "import requests",
- "import json\n"
- ]
-
- for action in actions:
- properties = action.get("properties", {})
- action_name = properties.get("code")
- action_type = action.get("type")
-
- if not action_name:
- continue
-
- class_name = f"Action{''.join(word.capitalize() for word in action_name.split('_'))}"
-
- # 生成类定义
- code.append(f"class {class_name}(Action):")
- code.append(f" def name(self) -> str:")
- code.append(f" return \"{action_name}\"\n")
-
- # 生成run方法
- code.append(f" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: dict) -> list:")
-
- if action_type == "action":
- # 普通动作节点
- config_text = properties.get("configText", "")
- code.append(f" dispatcher.utter_message(text=\"{config_text}\")")
- code.append(" return []\n")
- elif action_type == "form":
- # 表单节点(API调用)
- code.append(" # 构建请求头")
- code.append(" headers = {}")
-
- # 添加自定义请求头
- headers = properties.get("headers", [])
- for header in headers:
- code.append(f" headers[\"{header.get('key')}\"] = \"{header.get('value')}\"")
-
- # 添加内容类型
- code.append(" headers[\"Content-Type\"] = \"application/json\"\n")
-
- # 构建请求参数
- params = properties.get("params", [])
- if params:
- code.append(" # 构建请求参数")
- code.append(" params = {}")
- for param in params:
- entity = param.get("entity")
- param_name = param.get("name")
- code.append(f" params[\"{param_name}\"] = tracker.get_slot(\"{entity}\")")
- code.append("")
-
- # 发送请求
- code.append(" # 发送请求")
- code.append(" try:")
-
- method = properties.get("requestMethod")
- url = properties.get("requestUrl")
-
- if method == "GET":
- code.append(f" response = requests.get(\"{url}\", headers=headers, params=params)")
- elif method == "POST":
- code.append(f" response = requests.post(\"{url}\", headers=headers, params=params)")
- elif method == "PUT":
- code.append(f" response = requests.put(\"{url}\", headers=headers, params=params)")
- elif method == "DELETE":
- code.append(f" response = requests.delete(\"{url}\", headers=headers, params=params)")
- else:
- code.append(f" dispatcher.utter_message(text=f\"不支持的HTTP方法: {method}\")")
- code.append(" return []")
-
- # 处理响应
- code.append(" if response.status_code == 200:")
- code.append(" result = response.json()")
-
- # 处理响应映射
- response_mappings = properties.get("responseMappings", [])
- if response_mappings:
- code.append(" # 处理响应映射")
- code.append(" slot_events = []")
- for mapping in response_mappings:
- field = mapping.get("responseField")
- target = mapping.get("targetVar")
- default = mapping.get("defaultValue")
-
- code.append(f" # 提取 {field}")
- code.append(f" value = {default}")
- code.append(f" try:")
- code.append(f" # 简化的路径解析")
- code.append(f" parts = \"{field}\".split('.')")
- code.append(f" temp = result")
- code.append(f" for part in parts:")
- code.append(f" if isinstance(temp, dict) and part in temp:")
- code.append(f" temp = temp[part]")
- code.append(f" elif isinstance(temp, list) and part.isdigit() and int(part) < len(temp):")
- code.append(f" temp = temp[int(part)]")
- code.append(f" else:")
- code.append(f" raise Exception(\"路径无效\")")
- code.append(f" value = temp")
- code.append(f" except:")
- code.append(f" pass")
- code.append(f" slot_events.append(SlotSet(\"{target}\", value))")
-
- code.append(" dispatcher.utter_message(text=str(result))")
- code.append(" return slot_events")
- else:
- code.append(" dispatcher.utter_message(text=str(result))")
-
- code.append(" else:")
- code.append(" dispatcher.utter_message(text=f\"API调用失败,状态码: {response.status_code}\")")
- code.append(" except Exception as e:")
- code.append(" dispatcher.utter_message(text=f\"调用API时发生错误: {str(e)}\")\n")
- code.append(" return []\n")
- elif action_type == "condition":
- # 条件节点
- code.append(" # 处理条件逻辑")
- code.append(" # 这里简化处理,实际应用中应根据条件执行不同的逻辑")
- code.append(" return []\n")
-
- # 写入文件
- file_path = f"{self.output_dir}/actions/actions.py"
- with open(file_path, "w", encoding="utf-8") as f:
- f.write("\n".join(code))
-
- logger.info(f"生成actions文件: {file_path}")
- return file_path
-
- except Exception as e:
- logger.error(f"生成actions文件失败: {str(e)}")
- raise Exception(f"生成actions文件失败: {str(e)}")
-
- def _write_yaml(self, file, data, indent: int = 0, reset: bool = False) -> None:
- """
- 简单的YAML写入函数
-
- Args:
- file: 文件对象
- data: 要写入的数据
- indent: 当前缩进
- """
- try:
- indent_str = ""
- if (indent != 0):
- indent_str = " " * indent
-
- if isinstance(data, dict):
- for key, value in data.items():
- if isinstance(value, (dict, list)):
- file.write(f"{indent_str}{key}:\n")
- self._write_yaml(file, value, indent + 1)
- else:
- if (key != 'examples'):
- file.write(f"{key}: {self._format_yaml_value(value)}\n")
- else:
- file.write(f"{indent_str}{key}: {self._format_yaml_value(value)}\n")
-
- elif isinstance(data, list):
- for item in data:
- if isinstance(item, (dict, list)):
- file.write(f"{indent_str}- ")
- self._write_yaml(file, item, indent + 1, True)
- else:
- file.write(f"{indent_str}- {self._format_yaml_value(item)}\n")
- else:
- file.write(f"{self._format_yaml_value(data)}\n")
- except Exception as e:
- raise Exception(f"写入YAML数据失败: {str(e)}")
-
- def _format_yaml_value(self, value) -> str:
- """格式化YAML值"""
- if isinstance(value, str) and (":" in value or "\n" in value):
- return f"|{chr(10)}{value}"
- elif isinstance(value, str):
- return f'{value}'
- elif isinstance(value, bool):
- return "true" if value else "false"
- elif value is None:
- return "null"
- else:
- return str(value)
|