import os import sys import json import logging from pathlib import Path # 添加项目根目录到Python搜索路径 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from config import Config # 初始化日志 logger = logging.getLogger(__name__) class RasaFileGenerator: """Rasa配置文件生成服务,带完整异常处理""" def __init__(self): """初始化生成器,从配置获取输出目录""" try: # 创建配置实例 self.config = Config() self.output_dir = self.config.get("file_storage.rasa_files_dir", "./rasa_files") # 创建输出目录 self._create_directories() logger.info(f"Rasa文件生成器初始化成功,输出目录: {self.output_dir}") except Exception as e: logger.error(f"初始化Rasa文件生成器失败: {str(e)}") raise Exception(f"初始化Rasa文件生成器失败: {str(e)}") def _load_flow_json(self, flow_json_path: str) -> dict: """加载并解析flow.json文件""" try: with open(flow_json_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: raise Exception(f"flow.json文件未找到: {flow_json_path}") except json.JSONDecodeError: raise Exception(f"flow.json文件格式无效") except Exception as e: raise Exception(f"加载flow.json文件失败: {str(e)}") def _create_directories(self) -> None: """创建必要的目录结构""" try: Path(self.output_dir).mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/nlu").mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/stories").mkdir(parents=True, exist_ok=True) Path(f"{self.output_dir}/actions").mkdir(parents=True, exist_ok=True) except OSError as e: raise Exception(f"创建目录失败: {str(e)}") def generate_all_files(self, flow_json_str: str, flow_name: str) -> dict: """ 生成所有Rasa配置文件 Args: flow_json_str: flow数据的JSON字符串 Returns: 生成结果,包含文件路径和状态 """ try: # 解析flow JSON字符串 flow_data = json.loads(flow_json_str) # 生成各文件 domain_path = self.generate_domain_file(flow_data) nlu_path = self.generate_nlu_file(flow_data) story_paths = self.generate_stories_files(flow_data, flow_name) action_path = self.generate_actions_file(flow_data) return { "success": True, "message": "所有文件生成成功", "output_dir": self.output_dir, "files": { "domain": domain_path, "nlu": nlu_path, "stories": story_paths, "actions": action_path } } except OSError as e: logger.error(f"文件系统错误导致文件生成失败: {str(e)}") return { "success": False, "message": f"文件系统错误: {str(e)}", } except Exception as e: logger.error(f"文件生成失败: {str(e)}") return { "success": False, "message": f"生成文件时发生错误: {str(e)}", } def generate_domain_file(self, flow_data: dict) -> str: """生成domain.yml文件""" try: domain_data = { "version": "3.1", "intents": [], "slots": {}, "forms": {}, "responses": {}, "actions": [] } # 从flow_data中提取意图和动作 nodes = flow_data.get("flowJson", {}).get("nodes", []) intents = [] actions = [] forms = [] for node in nodes: node_type = node.get("type") properties = node.get("properties", {}) code = properties.get("code") if node_type == "intention" and code: intents.append(code) elif node_type == "action" and code: actions.append(code) elif node_type == "form" and code: actions.append(code) elif node_type == "collection" and code: forms.append(code) actions.append(f"form_{code}") # 添加意图 domain_data["intents"] = intents # 添加动作 domain_data["actions"] = actions # 添加表单 for form_code in forms: domain_data["forms"][form_code] = {} # 添加槽位 - 从表单节点提取 for node in nodes: if node.get("type") == "collection": form_code = node.get("properties", {}).get("code") form_fields = node.get("properties", {}).get("formFields", []) for field in form_fields: slot_name = field.get("slotName") or f"{form_code}_{field.get('entityType')}" domain_data["slots"][slot_name] = { "type": "text" # 默认为text类型,可根据需要调整 } # 写入文件 file_path = f"{self.output_dir}/domain.yml" with open(file_path, "w", encoding="utf-8") as f: self._write_yaml(f, domain_data) logger.info(f"生成domain文件: {file_path}") return file_path except Exception as e: logger.error(f"生成domain文件失败: {str(e)}") raise Exception(f"生成domain文件失败: {str(e)}") def generate_nlu_file(self, flow_data: dict) -> str: """生成nlu.yml文件""" try: nlu_data = { "version": "3.1", "nlu": [] } # 从flow_data中提取意图及样本 nodes = flow_data.get("flowJson", {}).get("nodes", []) for node in nodes: if node.get("type") == "intention": properties = node.get("properties", {}) intent_name = properties.get("code") intent_desc = properties.get("desc") samples = properties.get("samples", []) if not intent_name: continue # { # "text": "我想定明天的酒店", # "entities": [ # { # "text": "明天", # "label": "DATE", # "start": 3, # "end": 4 # } # ] # }, # entities = samples.get("entities", []) # 如果entities存在说明需要在样本中标记实体 # 处理实体并构建examples字符串 def format_sample_with_entities(sample): text = sample.get('text', '') entities = sample.get('entities', []) # 没有实体,直接返回文本 if not entities: return text # 按start位置降序排列实体,确保从后向前处理,避免替换位置偏移 sorted_entities = sorted(entities, key=lambda e: e.get('start', 0), reverse=True) for entity in sorted_entities: start = entity.get('start', 0) end = entity.get('end', 0) + 1 # 因为end是 exclusive 的 entity_text = text[start:end] label = entity.get('label', 'UNKNOWN') # 替换文本中的实体部分 text = text[:start] + f"[{entity_text}]({label})" + text[end:] return text examples_str = "\n".join([f" - {format_sample_with_entities(sample)}" for sample in samples]) intent_entry = { "intent": intent_name, "examples": examples_str } if intent_desc: intent_entry["intent"] = f"{intent_name} # {intent_desc}" nlu_data["nlu"].append(intent_entry) # 写入文件 file_path = f"{self.output_dir}/nlu/nlu.yml" with open(file_path, "w", encoding="utf-8") as f: self._write_yaml(f, nlu_data) logger.info(f"生成nlu文件: {file_path}") return file_path except Exception as e: logger.error(f"生成nlu文件失败: {str(e)}") raise Exception(f"生成nlu文件失败: {str(e)}") def generate_stories_files(self, flow_data: dict, flow_name: str) -> list[str]: """生成stories文件""" try: files = [] # 从flow_data中提取故事信息 # flow_name = flow_data.get("flowName", "default_flow") nodes = flow_data.get("flowJson", {}).get("nodes", []) edges = flow_data.get("flowJson", {}).get("edges", []) # 构建节点ID到节点的映射 node_map = {node.get("id"): node for node in nodes} # 构建边的映射 (source -> target) edge_map = {} for edge in edges: source = edge.get("sourceNodeId") target = edge.get("targetNodeId") if source not in edge_map: edge_map[source] = [] edge_map[source].append(target) # 找到开始节点 start_node = None for node in nodes: if node.get("type") == "start": start_node = node break if not start_node: logger.warning("未找到开始节点") return files # 构建故事步骤 story_steps = [] current_node = start_node while current_node: node_type = current_node.get("type") properties = current_node.get("properties", {}) code = properties.get("code") if node_type == "intention" and code: story_steps.append({"intent": code}) elif node_type == "action" and code: story_steps.append({"action": code}) elif node_type == "collection" and code: story_steps.append({"action": f"form_{code}"}) # 添加表单激活后的槽位填充 story_steps.append({"active_loop": {"name": code}}) story_steps.append({"active_loop": None}) elif node_type == "form" and code: story_steps.append({"action": code}) elif node_type == "condition" and code: # 简化处理条件节点 story_steps.append({"action": code}) # 这里应该根据条件分支生成不同的故事路径,但为简化起见,我们只取第一条路径 # 找到下一个节点 current_node_id = current_node.get("id") next_nodes = edge_map.get(current_node_id, []) if not next_nodes or node_type == "end": break # 简单处理,只取第一个下一个节点 current_node = node_map.get(next_nodes[0]) # 构建故事内容 story_entry = { "story": flow_name, "steps": story_steps } # 生成合并的stories.yml all_stories = { "version": "3.1", "stories": [story_entry] } # 写入合并的stories.yml merged_file_path = f"{self.output_dir}/stories/stories.yml" with open(merged_file_path, "w", encoding="utf-8") as f: self._write_yaml(f, all_stories) files.append(merged_file_path) logger.info(f"生成stories文件: {len(files)} 个文件") return files except Exception as e: logger.error(f"生成stories文件失败: {str(e)}") raise Exception(f"生成stories文件失败: {str(e)}") def generate_actions_file(self, flow_data: dict) -> str: """生成自定义动作Python文件""" try: # 从flow_data中提取动作信息 nodes = flow_data.get("flowJson", {}).get("nodes", []) actions = [] for node in nodes: node_type = node.get("type") if node_type == "action" or node_type == "form" or node_type == "condition": properties = node.get("properties", {}) if properties.get("code"): actions.append({ "type": node_type, "properties": properties }) if not actions: logger.warning(f"未找到任何自定义动作") # 生成actions.py内容 code = [ "from rasa_sdk import Action, Tracker", "from rasa_sdk.executor import CollectingDispatcher", "from rasa_sdk.events import SlotSet, ActiveLoop", "import requests", "import json\n" ] for action in actions: properties = action.get("properties", {}) action_name = properties.get("code") action_type = action.get("type") if not action_name: continue class_name = f"Action{''.join(word.capitalize() for word in action_name.split('_'))}" # 生成类定义 code.append(f"class {class_name}(Action):") code.append(f" def name(self) -> str:") code.append(f" return \"{action_name}\"\n") # 生成run方法 code.append(f" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: dict) -> list:") if action_type == "action": # 普通动作节点 config_text = properties.get("configText", "") code.append(f" dispatcher.utter_message(text=\"{config_text}\")") code.append(" return []\n") elif action_type == "form": # 表单节点(API调用) code.append(" # 构建请求头") code.append(" headers = {}") # 添加自定义请求头 headers = properties.get("headers", []) for header in headers: code.append(f" headers[\"{header.get('key')}\"] = \"{header.get('value')}\"") # 添加内容类型 code.append(" headers[\"Content-Type\"] = \"application/json\"\n") # 构建请求参数 params = properties.get("params", []) if params: code.append(" # 构建请求参数") code.append(" params = {}") for param in params: entity = param.get("entity") param_name = param.get("name") code.append(f" params[\"{param_name}\"] = tracker.get_slot(\"{entity}\")") code.append("") # 发送请求 code.append(" # 发送请求") code.append(" try:") method = properties.get("requestMethod") url = properties.get("requestUrl") if method == "GET": code.append(f" response = requests.get(\"{url}\", headers=headers, params=params)") elif method == "POST": code.append(f" response = requests.post(\"{url}\", headers=headers, params=params)") elif method == "PUT": code.append(f" response = requests.put(\"{url}\", headers=headers, params=params)") elif method == "DELETE": code.append(f" response = requests.delete(\"{url}\", headers=headers, params=params)") else: code.append(f" dispatcher.utter_message(text=f\"不支持的HTTP方法: {method}\")") code.append(" return []") # 处理响应 code.append(" if response.status_code == 200:") code.append(" result = response.json()") # 处理响应映射 response_mappings = properties.get("responseMappings", []) if response_mappings: code.append(" # 处理响应映射") code.append(" slot_events = []") for mapping in response_mappings: field = mapping.get("responseField") target = mapping.get("targetVar") default = mapping.get("defaultValue") code.append(f" # 提取 {field}") code.append(f" value = {default}") code.append(f" try:") code.append(f" # 简化的路径解析") code.append(f" parts = \"{field}\".split('.')") code.append(f" temp = result") code.append(f" for part in parts:") code.append(f" if isinstance(temp, dict) and part in temp:") code.append(f" temp = temp[part]") code.append(f" elif isinstance(temp, list) and part.isdigit() and int(part) < len(temp):") code.append(f" temp = temp[int(part)]") code.append(f" else:") code.append(f" raise Exception(\"路径无效\")") code.append(f" value = temp") code.append(f" except:") code.append(f" pass") code.append(f" slot_events.append(SlotSet(\"{target}\", value))") code.append(" dispatcher.utter_message(text=str(result))") code.append(" return slot_events") else: code.append(" dispatcher.utter_message(text=str(result))") code.append(" else:") code.append(" dispatcher.utter_message(text=f\"API调用失败,状态码: {response.status_code}\")") code.append(" except Exception as e:") code.append(" dispatcher.utter_message(text=f\"调用API时发生错误: {str(e)}\")\n") code.append(" return []\n") elif action_type == "condition": # 条件节点 code.append(" # 处理条件逻辑") code.append(" # 这里简化处理,实际应用中应根据条件执行不同的逻辑") code.append(" return []\n") # 写入文件 file_path = f"{self.output_dir}/actions/actions.py" with open(file_path, "w", encoding="utf-8") as f: f.write("\n".join(code)) logger.info(f"生成actions文件: {file_path}") return file_path except Exception as e: logger.error(f"生成actions文件失败: {str(e)}") raise Exception(f"生成actions文件失败: {str(e)}") def _write_yaml(self, file, data, indent: int = 0, reset: bool = False) -> None: """ 简单的YAML写入函数 Args: file: 文件对象 data: 要写入的数据 indent: 当前缩进 """ try: indent_str = "" if (indent != 0): indent_str = " " * indent if isinstance(data, dict): for key, value in data.items(): if isinstance(value, (dict, list)): file.write(f"{indent_str}{key}:\n") self._write_yaml(file, value, indent + 1) else: if (key != 'examples'): file.write(f"{key}: {self._format_yaml_value(value)}\n") else: file.write(f"{indent_str}{key}: {self._format_yaml_value(value)}\n") elif isinstance(data, list): for item in data: if isinstance(item, (dict, list)): file.write(f"{indent_str}- ") self._write_yaml(file, item, indent + 1, True) else: file.write(f"{indent_str}- {self._format_yaml_value(item)}\n") else: file.write(f"{self._format_yaml_value(data)}\n") except Exception as e: raise Exception(f"写入YAML数据失败: {str(e)}") def _format_yaml_value(self, value) -> str: """格式化YAML值""" if isinstance(value, str) and (":" in value or "\n" in value): return f"|{chr(10)}{value}" elif isinstance(value, str): return f'{value}' elif isinstance(value, bool): return "true" if value else "false" elif value is None: return "null" else: return str(value)