Files
雷雨 8635b84b2d init
2025-12-15 22:05:56 +08:00

1170 lines
39 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from langchain_core.tools import tool
import json
from pydantic import BaseModel, Field
import logging
import sys
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import numpy as np
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
import pymysql
from pymysql.cursors import DictCursor
# 添加父目录到路径以导入config_loader
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config_loader import get_config_loader
# 配置日志
logger = logging.getLogger(__name__)
# 全局数据库配置
_db_config = None
def get_db_config():
"""获取数据库配置"""
global _db_config
if _db_config is None:
try:
config_loader = get_config_loader(strict_mode=False)
config = config_loader.config
_db_config = config.get("database", {}).get("starrocks", {})
logger.info("✅ 数据库配置加载成功")
except Exception as e:
logger.error(f"❌ 数据库配置加载失败: {e}")
_db_config = {}
return _db_config
def convert_numpy_types(obj):
"""
递归转换 numpy 类型为 Python 原生类型,使其可以被 JSON 序列化
Args:
obj: 任意对象
Returns:
转换后的对象
"""
if isinstance(obj, dict):
return {key: convert_numpy_types(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_numpy_types(item) for item in obj]
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj
def get_db_connection():
"""获取数据库连接"""
try:
db_config = get_db_config()
connection = pymysql.connect(
host=db_config.get("host", "localhost"),
port=db_config.get("port", 9030),
user=db_config.get("user", "root"),
password=db_config.get("password", ""),
database=db_config.get("database", "smart_home"),
charset=db_config.get("charset", "utf8mb4"),
cursorclass=DictCursor,
autocommit=True,
connect_timeout=10,
)
return connection
except Exception as e:
logger.error(f"❌ 数据库连接失败: {e}")
return None
def extract_time_features(dt: datetime) -> Dict[str, float]:
"""
从日期时间中提取特征用于聚类
Args:
dt: 日期时间对象
Returns:
包含时间特征的字典
"""
return {
"hour": dt.hour,
"minute": dt.minute,
"day_of_week": dt.weekday(), # 0=周一, 6=周日
"is_weekend": 1 if dt.weekday() >= 5 else 0,
"is_morning": 1 if 6 <= dt.hour < 12 else 0, # 早上 6-12点
"is_afternoon": 1 if 12 <= dt.hour < 18 else 0, # 下午 12-18点
"is_evening": 1 if 18 <= dt.hour < 22 else 0, # 晚上 18-22点
"is_night": 1 if dt.hour >= 22 or dt.hour < 6 else 0, # 夜晚 22-6点
}
def prepare_clustering_data(operations: List[Dict[str, Any]]) -> tuple:
"""
准备用于聚类的数据
Args:
operations: 设备操作记录列表
Returns:
(特征矩阵, 原始数据列表, 特征名称列表)
"""
features_list = []
data_records = []
for op in operations:
try:
created_at = op.get("created_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at)
# 提取时间特征
time_features = extract_time_features(created_at)
# 设备类型编码简单的one-hot编码
device_type = op.get("device_type", "")
device_features = {
"is_ac": 1 if device_type == "air_conditioner" else 0,
"is_cleaner": 1 if device_type == "air_cleaner" else 0,
"is_lamp": 1 if device_type == "bedside_lamp" else 0,
}
# 合并所有特征
all_features = {**time_features, **device_features}
features_list.append(list(all_features.values()))
# 保存原始数据记录
data_records.append(
{
"operation": op,
"time_features": time_features,
"device_type": device_type,
}
)
except Exception as e:
logger.warning(f"⚠️ 处理操作记录失败: {e}")
continue
feature_names = list(all_features.keys()) if features_list else []
X = np.array(features_list) if features_list else np.array([])
return X, data_records, feature_names
def cluster_scenes_with_gmm(X: np.ndarray, n_components: int = 5) -> np.ndarray:
"""
使用高斯混合模型(GMM)进行场景聚类
Args:
X: 特征矩阵
n_components: 聚类数量
Returns:
聚类标签数组
"""
if len(X) == 0:
return np.array([])
# 标准化特征
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 使用GMM进行聚类
actual_components = min(n_components, len(X)) # 确保聚类数不超过样本数
gmm = GaussianMixture(
n_components=actual_components,
covariance_type="full",
max_iter=100,
random_state=42,
)
labels = gmm.fit_predict(X_scaled)
# 统计每个场景的样本数
unique_labels, counts = np.unique(labels, return_counts=True)
logger.info(f"✅ GMM聚类完成: {len(X)}个样本被分为{actual_components}个场景")
# 打印详细的聚类结果
for label, count in zip(unique_labels, counts):
percentage = (count / len(X)) * 100
logger.debug(f" 场景 {label}: {count} 个样本 ({percentage:.1f}%)")
return labels
def analyze_scene_patterns(
data_records: List[Dict], labels: np.ndarray
) -> Dict[int, Dict[str, Any]]:
"""
分析每个场景的模式特征
Args:
data_records: 数据记录列表
labels: 聚类标签
Returns:
场景分析结果字典 {场景ID: 场景特征}
"""
scenes = {}
for label in np.unique(labels):
# 获取该场景的所有记录
scene_indices = np.where(labels == label)[0]
scene_records = [data_records[i] for i in scene_indices]
# 统计时间特征
hours = [rec["time_features"]["hour"] for rec in scene_records]
avg_hour = np.mean(hours)
# 判断时段
if 6 <= avg_hour < 12:
time_period = "早上"
elif 12 <= avg_hour < 18:
time_period = "下午"
elif 18 <= avg_hour < 22:
time_period = "晚上"
else:
time_period = "夜晚"
# 统计设备操作
device_actions = {}
for rec in scene_records:
op = rec["operation"]
device_type = op.get("device_type", "unknown")
action = op.get("action", "unknown")
if device_type not in device_actions:
device_actions[device_type] = []
# 解析参数
params_str = op.get("parameters")
params = {}
if params_str:
try:
params = (
json.loads(params_str)
if isinstance(params_str, str)
else params_str
)
except:
pass
device_actions[device_type].append({"action": action, "parameters": params})
# 生成场景描述
scene_name = f"场景{label+1}_{time_period}"
scenes[label] = {
"scene_id": int(label), # 转换 numpy.int64 为 int
"scene_name": scene_name,
"time_period": time_period,
"avg_hour": float(round(avg_hour, 1)), # 转换 numpy.float64 为 float
"occurrence_count": len(scene_records),
"device_actions": device_actions,
"description": generate_scene_description(time_period, device_actions),
}
return scenes
def generate_scene_description(
time_period: str, device_actions: Dict[str, List]
) -> str:
"""
生成场景描述
Args:
time_period: 时间段
device_actions: 设备操作字典
Returns:
场景描述文本
"""
desc_parts = [f"{time_period}时段,用户通常会:"]
for device_type, actions in device_actions.items():
device_name_map = {
"air_conditioner": "空调",
"air_cleaner": "空气净化器",
"bedside_lamp": "床头灯",
}
device_name = device_name_map.get(device_type, device_type)
# 统计最常见的操作
action_counts = {}
for action_info in actions:
action = action_info["action"]
action_counts[action] = action_counts.get(action, 0) + 1
most_common_action = (
max(action_counts, key=action_counts.get) if action_counts else "操作"
)
count = action_counts[most_common_action]
desc_parts.append(f"- 对{device_name}执行「{most_common_action}」({count}次)")
return "\n".join(desc_parts)
class SceneQueryArgs(BaseModel):
user_query: str = Field(
...,
description="用户的场景描述或查询,例如:'我要睡觉了''起床了''打开空调'",
)
system_user_id: int = Field(
default=1, description="系统用户ID用于查询该用户的历史数据"
)
days: int = Field(default=30, description="查询最近N天的数据默认30天")
@tool(
"query_user_scene_habits",
args_schema=SceneQueryArgs,
description="基于用户历史行为数据使用GMM算法分析场景习惯为用户查询提供个性化设备操作建议",
)
def query_user_scene_habits(
user_query: str, system_user_id: int = 1, days: int = 30
) -> str:
"""
查询用户场景习惯使用GMM进行场景聚类分析
Args:
user_query: 用户查询或场景描述
system_user_id: 系统用户ID
days: 查询最近N天的数据
Returns:
JSON格式的分析结果
"""
try:
# 连接数据库
conn = get_db_connection()
if not conn:
return json.dumps(
{
"status": "error",
"message": "数据库连接失败",
"recommendation": None,
},
ensure_ascii=False,
indent=2,
)
try:
# 查询设备操作历史
start_date = (datetime.now() - timedelta(days=days)).strftime(
"%Y-%m-%d %H:%M:%S"
)
sql = """
SELECT id, system_user_id, created_at, context_id, device_type,
device_name, action, parameters, success
FROM device_operations
WHERE system_user_id = %s
AND created_at >= %s
AND success = TRUE
ORDER BY created_at DESC
"""
with conn.cursor() as cursor:
cursor.execute(sql, (system_user_id, start_date))
operations = cursor.fetchall()
print("\n" + "="*80)
print(f"🔍 【数据挖掘开始】用户习惯分析")
print("="*80)
print(f"👤 用户ID: {system_user_id}")
print(f"📅 查询时间范围: 最近 {days}")
print(f"📊 查询到 {len(operations)} 条设备操作记录")
print(f"🎯 用户查询: {user_query}")
logger.info(f"📊 查询到 {len(operations)} 条设备操作记录")
# 检查数据量
if len(operations) < 10:
return json.dumps(
{
"status": "insufficient_data",
"message": f"历史数据不足(仅{len(operations)}条记录),建议使用通用最佳实践",
"user_query": user_query,
"data_count": len(operations),
"recommendation": None,
},
ensure_ascii=False,
indent=2,
)
# 准备聚类数据
print("\n📦 【准备聚类数据】")
X, data_records, feature_names = prepare_clustering_data(operations)
print(f" ✓ 特征矩阵维度: {X.shape if len(X) > 0 else '(0,0)'}")
print(f" ✓ 特征列表: {', '.join(feature_names)}")
if len(X) == 0:
print(" ❌ 无法准备聚类数据")
return json.dumps(
{
"status": "error",
"message": "无法准备聚类数据",
"recommendation": None,
},
ensure_ascii=False,
indent=2,
)
# 执行GMM聚类
n_clusters = min(5, len(X) // 5) # 动态确定聚类数
n_clusters = max(2, n_clusters) # 至少2个场景
print(f"\n🤖 【GMM聚类分析】")
print(f" 样本数量: {len(X)}")
print(f" 目标场景数: {n_clusters}")
print(f" ⏳ 正在执行高斯混合模型聚类...")
labels = cluster_scenes_with_gmm(X, n_components=n_clusters)
print(f" ✓ 聚类完成!识别出 {len(np.unique(labels))} 个使用场景")
# 分析场景模式
print(f"\n📈 【场景模式分析】")
scenes = analyze_scene_patterns(data_records, labels)
# 打印每个场景的详细信息
for scene_id, scene in scenes.items():
print(f"\n 🎬 场景 #{scene_id + 1}: {scene['time_period']} {scene.get('weekend_info', '')}")
print(f" 📍 时间段: {scene['time_period']} (平均 {scene['avg_hour']:.1f}:00)")
print(f" 📊 出现次数: {scene['occurrence_count']}")
print(f" 🔧 设备操作:")
for device, actions in scene['device_actions'].items():
print(f"{device}:")
for action_info in actions[:3]: # 只显示前3个操作
action_str = f" - {action_info['action']}"
if action_info.get('parameters'):
params_str = ', '.join([f"{k}={v}" for k, v in list(action_info['parameters'].items())[:2]])
action_str += f" ({params_str})"
print(action_str)
if len(actions) > 3:
print(f" ... 还有 {len(actions)-3} 个操作")
# 根据用户查询匹配最相关的场景
print(f"\n🎯 【场景匹配】")
print(f" 查询: {user_query}")
matched_scene = match_scene_by_query(user_query, scenes)
if matched_scene:
print(f" ✓ 匹配到场景: {matched_scene['time_period']} - 出现 {matched_scene['occurrence_count']}")
else:
print(f" ⚠️ 未找到匹配场景")
# 生成推荐
recommendation = None
if matched_scene:
print(f"\n💡 【生成推荐】")
recommendation = generate_recommendation_with_confidence(matched_scene)
if recommendation:
print(f" ✓ 推荐已生成,包含 {len(recommendation.get('device_operations', []))} 个设备操作")
for idx, op in enumerate(recommendation.get('device_operations', []), 1):
confidence = op.get('confidence_score', 0)
confidence_emoji = "🟢" if confidence > 0.7 else "🟡" if confidence > 0.5 else "🔴"
print(f" {idx}. {op['device_type']}.{op['action']} {confidence_emoji} (置信度: {confidence:.2f})")
if op.get('parameters'):
params_str = ', '.join([f"{k}={v}" for k, v in list(op['parameters'].items())[:2]])
print(f" 参数: {params_str}")
print(f"\n 📝 用户反馈说明:")
print(f" • 您有 5 分钟时间修改推荐操作")
print(f" • 修改后的操作置信度会提高")
print(f" • 系统会学习您的修改偏好")
result = {
"status": "success",
"message": f"基于最近{days}天的{len(operations)}条记录,识别出{len(scenes)}个使用场景",
"user_query": user_query,
"total_operations": len(operations),
"identified_scenes": len(scenes),
"matched_scene": matched_scene,
"all_scenes": list(scenes.values()),
"recommendation": recommendation,
}
print("\n" + "="*80)
print(f"✅ 【数据挖掘完成】")
print(f" 总记录数: {len(operations)}")
print(f" 识别场景: {len(scenes)}")
print(f" 匹配状态: {'成功' if matched_scene else '未匹配'}")
print(f" 推荐状态: {'已生成' if recommendation else ''}")
print("="*80 + "\n")
# 转换 numpy 类型为 Python 原生类型,确保可以 JSON 序列化
result = convert_numpy_types(result)
return json.dumps(result, ensure_ascii=False, indent=2)
finally:
conn.close()
except Exception as e:
logger.error(f"❌ 场景习惯查询失败: {e}", exc_info=True)
return json.dumps(
{
"status": "error",
"message": f"分析失败: {str(e)}",
"recommendation": None,
},
ensure_ascii=False,
indent=2,
)
def match_scene_by_query(user_query: str, scenes: Dict[int, Dict]) -> Optional[Dict]:
"""
根据用户查询匹配最相关的场景
Args:
user_query: 用户查询文本
scenes: 场景字典
Returns:
匹配的场景信息
"""
# 简单的关键词匹配策略
query_lower = user_query.lower()
# 时间关键词映射
time_keywords = {
"早上": ["早上", "早晨", "起床", "上午"],
"下午": ["下午", "午后"],
"晚上": ["晚上", "傍晚", "晚饭", "晚餐"],
"夜晚": ["夜晚", "睡觉", "睡眠", "休息", "晚安"],
}
# 设备关键词映射
device_keywords = {
"air_conditioner": ["空调", "ac", "制冷", "制热", "温度"],
"air_cleaner": ["空气净化器", "净化器", "空气", "pm2.5"],
"bedside_lamp": ["床头灯", "", "照明", "亮度"],
}
# 找出匹配的时间段
matched_time_period = None
for period, keywords in time_keywords.items():
if any(kw in query_lower for kw in keywords):
matched_time_period = period
break
# 找出匹配的设备类型
matched_device_type = None
for device, keywords in device_keywords.items():
if any(kw in query_lower for kw in keywords):
matched_device_type = device
break
# 评分并选择最佳匹配场景
best_scene = None
best_score = 0
for scene in scenes.values():
score = 0
# 时间匹配得分
if matched_time_period and scene["time_period"] == matched_time_period:
score += 10
# 设备匹配得分
if matched_device_type and matched_device_type in scene["device_actions"]:
score += 10
# 出现频率得分(归一化)
score += scene["occurrence_count"] * 0.1
if score > best_score:
best_score = score
best_scene = scene
# 如果没有明确匹配,选择出现最频繁的场景
if best_score == 0 and scenes:
best_scene = max(scenes.values(), key=lambda s: s["occurrence_count"])
return best_scene
# 注意:此函数已废弃,请使用 generate_recommendation_with_confidence
# def generate_recommendation(scene: Dict) -> Dict[str, Any]:
# """已废弃:使用 generate_recommendation_with_confidence 替代"""
# pass
@tool("get_data_mining_status", description="获取数据挖掘Agent的状态和统计信息")
def get_data_mining_status() -> str:
"""
获取数据挖掘Agent的状态
Returns:
JSON格式的状态信息
"""
try:
conn = get_db_connection()
if not conn:
return json.dumps(
{"status": "error", "message": "数据库连接失败"},
ensure_ascii=False,
indent=2,
)
try:
# 统计数据库中的记录数
with conn.cursor() as cursor:
cursor.execute(
"SELECT COUNT(*) as total FROM device_operations WHERE success = TRUE"
)
total_ops = cursor.fetchone()["total"]
cursor.execute(
"SELECT COUNT(DISTINCT system_user_id) as users FROM device_operations"
)
total_users = cursor.fetchone()["users"]
cursor.execute(
"""
SELECT device_type, COUNT(*) as count
FROM device_operations
WHERE success = TRUE
GROUP BY device_type
"""
)
device_stats = cursor.fetchall()
status_info = {
"status": "online",
"message": "数据挖掘Agent运行正常",
"database_status": "connected",
"statistics": {
"total_operations": total_ops,
"total_users": total_users,
"device_breakdown": {
stat["device_type"]: stat["count"] for stat in device_stats
},
},
"capabilities": [
"GMM场景聚类分析",
"用户习惯挖掘",
"个性化场景推荐",
"历史行为分析",
"用户反馈学习",
"置信度动态调整",
],
}
# 转换 numpy 类型为 Python 原生类型
status_info = convert_numpy_types(status_info)
return json.dumps(status_info, ensure_ascii=False, indent=2)
finally:
conn.close()
except Exception as e:
logger.error(f"❌ 获取状态失败: {e}")
return json.dumps(
{"status": "error", "message": f"获取状态失败: {str(e)}"},
ensure_ascii=False,
indent=2,
)
# ============================================
# 增强功能:置信度计算和用户反馈学习
# ============================================
def calculate_action_confidence(
action_info: Dict[str, Any],
scene_occurrence: int,
total_operations: int,
is_user_modified: bool = False,
) -> float:
"""
计算设备操作的置信度
Args:
action_info: 操作信息(包含频次)
scene_occurrence: 场景出现次数
total_operations: 总操作数
is_user_modified: 是否被用户修改过
Returns:
置信度分数0-1
"""
# 基础置信度:基于频次
frequency = action_info.get("frequency", 1)
base_confidence = min(frequency / scene_occurrence, 1.0)
# 场景稳定性:场景出现频率
scene_stability = min(scene_occurrence / total_operations, 1.0)
# 综合置信度
confidence = base_confidence * 0.7 + scene_stability * 0.3
# 用户修改过的操作,置信度最高
if is_user_modified:
confidence = min(confidence * 1.5, 1.0)
return round(confidence, 3)
def adjust_confidence_by_feedback(
original_params: Dict, modified_params: Dict, original_confidence: float
) -> float:
"""
根据用户反馈调整置信度
Args:
original_params: 原始推荐参数
modified_params: 用户修改后的参数
original_confidence: 原始置信度
Returns:
调整后的置信度
"""
if not original_params or not modified_params:
return original_confidence * 0.5
# 计算参数差异
differences = 0
total_params = len(original_params)
for key in original_params:
if key in modified_params:
if original_params[key] != modified_params[key]:
# 计算差异程度
if isinstance(original_params[key], (int, float)) and isinstance(
modified_params[key], (int, float)
):
# 数值型参数:计算相对差异
diff = abs(original_params[key] - modified_params[key]) / max(
abs(original_params[key]), 1
)
differences += min(diff, 1.0)
else:
# 非数值型参数:完全不同
differences += 1.0
# 差异越大,置信度降低越多
if total_params > 0:
diff_ratio = differences / total_params
adjusted_confidence = original_confidence * (
1 - diff_ratio * 0.8
) # 最多降低80%
else:
adjusted_confidence = original_confidence * 0.5
return max(round(adjusted_confidence, 3), 0.1) # 最低保留0.1
def query_operations_with_db_preprocessing(
system_user_id: int, days: int = 30, include_confidence: bool = True
) -> List[Dict[str, Any]]:
"""
从数据库查询操作记录利用StarRocks进行预处理
Args:
system_user_id: 用户ID
days: 查询天数
include_confidence: 是否包含置信度信息
Returns:
预处理后的操作记录列表(包含特征)
"""
conn = get_db_connection()
if not conn:
return []
try:
start_date = (datetime.now() - timedelta(days=days)).strftime(
"%Y-%m-%d %H:%M:%S"
)
# 使用视图进行数据库端预处理
if include_confidence:
sql = """
SELECT
id, system_user_id, created_at, context_id,
device_type, device_name, action, parameters,
confidence_score, is_user_modified, confidence_source,
hour, minute, day_of_week, is_weekend,
is_morning, is_afternoon, is_evening, is_night
FROM enhanced_operations_view
WHERE system_user_id = %s
AND created_at >= %s
ORDER BY created_at DESC
"""
else:
sql = """
SELECT
id, system_user_id, created_at, context_id,
device_type, device_name, action, parameters, success
FROM device_operations
WHERE system_user_id = %s
AND created_at >= %s
AND success = TRUE
ORDER BY created_at DESC
"""
with conn.cursor() as cursor:
cursor.execute(sql, (system_user_id, start_date))
operations = cursor.fetchall()
logger.info(f"📊 从数据库查询到 {len(operations)} 条记录(已包含预处理特征)")
return operations
except Exception as e:
logger.error(f"❌ 查询操作记录失败: {e}")
return []
finally:
conn.close()
def save_user_feedback(
system_user_id: int,
context_id: str,
original_recommendation: Dict[str, Any],
user_modification: Dict[str, Any],
scene_matched: str,
time_period: str,
) -> bool:
"""
保存用户反馈到数据库
Args:
system_user_id: 用户ID
context_id: 会话ID
original_recommendation: 原始推荐
user_modification: 用户修改
scene_matched: 匹配的场景
time_period: 时间段
Returns:
是否保存成功
"""
conn = get_db_connection()
if not conn:
return False
try:
feedback_id = int(datetime.now().timestamp() * 1000000)
# 判断反馈类型
if not user_modification:
feedback_type = "rejected"
elif user_modification == original_recommendation:
feedback_type = "accepted"
else:
feedback_type = "modified"
sql = """
INSERT INTO user_feedback
(id, system_user_id, created_at, context_id,
original_recommendation, user_modification, scene_matched, time_period,
feedback_type, feedback_timestamp, is_processed)
VALUES (%s, %s, NOW(), %s, %s, %s, %s, %s, %s, NOW(), FALSE)
"""
with conn.cursor() as cursor:
cursor.execute(
sql,
(
feedback_id,
system_user_id,
context_id,
json.dumps(original_recommendation, ensure_ascii=False),
json.dumps(user_modification, ensure_ascii=False),
scene_matched,
time_period,
feedback_type,
),
)
logger.info(f"✅ 用户反馈已保存: {feedback_type}, scene={scene_matched}")
return True
except Exception as e:
logger.error(f"❌ 保存用户反馈失败: {e}")
return False
finally:
conn.close()
def update_confidence_scores(
system_user_id: int, feedback_data: Dict[str, Any]
) -> bool:
"""
根据用户反馈更新置信度分数
Args:
system_user_id: 用户ID
feedback_data: 反馈数据
Returns:
是否更新成功
"""
conn = get_db_connection()
if not conn:
return False
try:
original_rec = feedback_data.get("original_recommendation", {})
user_mod = feedback_data.get("user_modification", {})
for action in original_rec.get("suggested_actions", []):
device_type = action.get("device_type")
original_params = action.get("parameters", {})
# 查找用户修改后的对应操作
modified_action = None
for mod_action in user_mod.get("suggested_actions", []):
if mod_action.get("device_type") == device_type:
modified_action = mod_action
break
if modified_action:
modified_params = modified_action.get("parameters", {})
original_confidence = action.get("confidence", 0.5)
# 计算调整后的置信度
new_confidence = adjust_confidence_by_feedback(
original_params, modified_params, original_confidence
)
# 保存到数据库
confidence_id = int(datetime.now().timestamp() * 1000000)
sql = """
INSERT INTO device_operation_confidence
(id, operation_id, system_user_id, created_at, device_type,
action, parameters, confidence_score, source, is_user_modified,
original_confidence, adjustment_reason)
VALUES (%s, %s, %s, NOW(), %s, %s, %s, %s, %s, %s, %s, %s)
"""
with conn.cursor() as cursor:
cursor.execute(
sql,
(
confidence_id,
0, # 临时值,实际应关联具体操作
system_user_id,
device_type,
modified_action.get("action"),
json.dumps(modified_params, ensure_ascii=False),
new_confidence,
"user_feedback",
True,
original_confidence,
f"用户修改,原参数差异调整",
),
)
logger.info(f"✅ 置信度分数已更新")
return True
except Exception as e:
logger.error(f"❌ 更新置信度失败: {e}")
return False
finally:
conn.close()
def generate_recommendation_with_confidence(scene: Dict) -> Dict[str, Any]:
"""
基于场景生成带置信度的设备操作建议(增强版)
Args:
scene: 场景信息
Returns:
包含置信度的操作建议字典
"""
recommendations = {
"scene_name": scene["scene_name"],
"time_period": scene["time_period"],
"confidence": "" if scene["occurrence_count"] > 10 else "",
"suggested_actions": [],
"feedback_window": 300, # 5分钟 = 300秒
"feedback_instruction": "您有5分钟时间对推荐操作进行调整系统会学习您的偏好",
}
# 确保 total_ops 是 Python int
total_ops = int(scene["occurrence_count"])
# 为每个设备生成具体建议(包含置信度)
for device_type, actions in scene["device_actions"].items():
device_name_map = {
"air_conditioner": "空调",
"air_cleaner": "空气净化器",
"bedside_lamp": "床头灯",
}
device_name = device_name_map.get(device_type, device_type)
# 统计最常见的操作和参数
action_stats = {}
for action_info in actions:
action_key = action_info["action"]
if action_key not in action_stats:
action_stats[action_key] = {"count": 0, "parameters": []}
action_stats[action_key]["count"] += 1
if action_info["parameters"]:
action_stats[action_key]["parameters"].append(action_info["parameters"])
# 选择最常见的操作(带置信度)
for action, stats in sorted(
action_stats.items(), key=lambda x: x[1]["count"], reverse=True
)[:3]:
# 计算置信度
confidence = calculate_action_confidence(
{"frequency": stats["count"]}, total_ops, total_ops
)
suggestion = {
"device_type": device_type,
"device_name": device_name,
"action": action,
"frequency": int(stats["count"]), # 确保是 Python int
"parameters": stats["parameters"][0] if stats["parameters"] else {},
"confidence": float(confidence), # 确保是 Python float
"confidence_level": (
"" if confidence > 0.7 else ("" if confidence > 0.4 else "")
),
}
recommendations["suggested_actions"].append(suggestion)
return recommendations
class FeedbackArgs(BaseModel):
context_id: str = Field(..., description="会话上下文ID")
original_recommendation: Dict[str, Any] = Field(
..., description="原始推荐内容JSON"
)
user_modification: Dict[str, Any] = Field(
..., description="用户修改后的内容JSON"
)
system_user_id: int = Field(default=1, description="系统用户ID")
@tool(
"submit_user_feedback",
args_schema=FeedbackArgs,
description="提交用户对推荐操作的反馈5分钟窗口内用于改进GMM模型准确性",
)
def submit_user_feedback(
context_id: str,
original_recommendation: Dict[str, Any],
user_modification: Dict[str, Any],
system_user_id: int = 1,
) -> str:
"""
提交用户反馈,用于在线学习
Args:
context_id: 会话ID
original_recommendation: 原始推荐
user_modification: 用户修改
system_user_id: 用户ID
Returns:
反馈处理结果
"""
try:
print("\n" + "="*80)
print(f"📝 【用户反馈处理】")
print("="*80)
print(f"👤 用户ID: {system_user_id}")
print(f"🔖 会话ID: {context_id}")
scene_matched = original_recommendation.get("scene_name", "unknown")
time_period = original_recommendation.get("time_period", "unknown")
print(f"🎬 场景: {scene_matched} ({time_period})")
# 比较原始推荐和用户修改
is_modified = user_modification != original_recommendation
feedback_type = "modified" if is_modified else "accepted"
print(f"\n📊 反馈类型: {feedback_type}")
if is_modified:
print(f"\n🔄 用户修改详情:")
orig_ops = original_recommendation.get('device_operations', [])
user_ops = user_modification.get('device_operations', [])
# 显示修改的操作
for i, (orig_op, user_op) in enumerate(zip(orig_ops, user_ops), 1):
if orig_op != user_op:
print(f" {i}. {orig_op.get('device_type')}:")
print(f" 原始: {orig_op.get('action')} {orig_op.get('parameters', {})}")
print(f" 修改: {user_op.get('action')} {user_op.get('parameters', {})}")
else:
print(f" ✓ 用户接受了原始推荐")
# 保存反馈
print(f"\n💾 正在保存反馈到数据库...")
success = save_user_feedback(
system_user_id,
context_id,
original_recommendation,
user_modification,
scene_matched,
time_period,
)
if not success:
print(f" ❌ 保存失败")
return json.dumps(
{"status": "error", "message": "保存用户反馈失败"},
ensure_ascii=False,
indent=2,
)
print(f" ✓ 反馈已保存")
# 更新置信度
print(f"\n🎯 正在更新置信度模型...")
update_confidence_scores(
system_user_id,
{
"original_recommendation": original_recommendation,
"user_modification": user_modification,
},
)
print(f" ✓ 置信度已更新")
result = {
"status": "success",
"message": "感谢您的反馈!系统已记录您的偏好,将在下次推荐中应用",
"feedback_type": feedback_type,
"learning_status": "已更新置信度模型",
"next_recommendation": "下次将优先考虑您修改后的操作",
}
print("\n" + "="*80)
print(f"✅ 【用户反馈处理完成】")
print(f" 反馈类型: {feedback_type}")
print(f" 学习状态: 已更新")
print("="*80 + "\n")
# 转换 numpy 类型为 Python 原生类型
result = convert_numpy_types(result)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"❌ 提交用户反馈失败: {e}")
return json.dumps(
{"status": "error", "message": f"提交反馈失败: {str(e)}"},
ensure_ascii=False,
indent=2,
)