# example requires websocket-client library: # pip install websocket-client import os import json import websocket url = "ws://yunwu.ai/v1/realtime?model=gpt-4o-realtime-preview" headers = [ "Authorization: Bearer sk-xxx" , "OpenAI-Beta: realtime=v1" ] def decode_unicode_string(s): """解码Unicode字符串,使中文可读""" try: # 先尝试直接解码 if isinstance(s, bytes): return s.decode('utf-8') # 如果是已经乱码的字符串,先编码再解码 return s.encode('latin1').decode('utf-8') except Exception as e: print(f"解码失败: {e}") return s def on_error(ws, error): print("\n=== 错误信息 ===") print(f"{error}") def on_close(ws, close_status_code, close_msg): print("\n=== 连接关闭 ===") print(f"状态码: {close_status_code}") print(f"关闭信息: {close_msg}") def on_open(ws): print("\n=== 连接已建立 ===") event = { "type": "response.create", "response": { "modalities": ["text"], "instructions": "Please assist the user." } } print("\n=== 发送消息 ===") print(json.dumps(event, ensure_ascii=False, indent=2)) ws.send(json.dumps(event)) def on_message(ws, message): print("\n=== 收到消息 ===") try: data = json.loads(message) # 如果存在错误信息,解码显示中文 if 'error' in data and 'message' in data['error']: data['error']['message'] = decode_unicode_string(data['error']['message']) print(json.dumps(data, ensure_ascii=False, indent=2)) # 如果是错误消息,额外显示一个简化版本 if data.get('type') == 'error': print("\n=== 错误摘要 ===") print(f"错误类型: {data['error'].get('type', 'unknown')}") print(f"错误代码: {data['error'].get('code', 'unknown')}") error_message = decode_unicode_string(data['error'].get('message', 'unknown')) print(f"错误信息: {error_message}") except json.JSONDecodeError as e: print(f"解析消息失败: {message}") print(f"错误: {e}") # 启用websocket日志,但只显示关键信息 websocket.enableTrace(False) ws = websocket.WebSocketApp( url, header=headers, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close ) print("\n=== 开始连接 ===") print(f"连接地址: {url}") ws.run_forever()