forked from agent0ai/agent-zero
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_message.py
More file actions
141 lines (117 loc) · 5.3 KB
/
api_message.py
File metadata and controls
141 lines (117 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import base64
import os
from datetime import datetime, timedelta
from agent import AgentContext, UserMessage, AgentContextType
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import files
from python.helpers.print_style import PrintStyle
from werkzeug.utils import secure_filename
from initialize import initialize_agent
import threading
class ApiMessage(ApiHandler):
# Track chat lifetimes for cleanup
_chat_lifetimes = {}
_cleanup_lock = threading.Lock()
@classmethod
def requires_auth(cls) -> bool:
return False # No web auth required
@classmethod
def requires_csrf(cls) -> bool:
return False # No CSRF required
@classmethod
def requires_api_key(cls) -> bool:
return True # Require API key
async def process(self, input: dict, request: Request) -> dict | Response:
# Extract parameters
context_id = input.get("context_id", "")
message = input.get("message", "")
attachments = input.get("attachments", [])
lifetime_hours = input.get("lifetime_hours", 24) # Default 24 hours
if not message:
return Response('{"error": "Message is required"}', status=400, mimetype="application/json")
# Handle attachments (base64 encoded)
attachment_paths = []
if attachments:
upload_folder_int = "/a0/tmp/uploads"
upload_folder_ext = files.get_abs_path("tmp/uploads")
os.makedirs(upload_folder_ext, exist_ok=True)
for attachment in attachments:
if not isinstance(attachment, dict) or "filename" not in attachment or "base64" not in attachment:
continue
try:
filename = secure_filename(attachment["filename"])
if not filename:
continue
# Decode base64 content
file_content = base64.b64decode(attachment["base64"])
# Save to temp file
save_path = os.path.join(upload_folder_ext, filename)
with open(save_path, "wb") as f:
f.write(file_content)
attachment_paths.append(os.path.join(upload_folder_int, filename))
except Exception as e:
PrintStyle.error(f"Failed to process attachment {attachment.get('filename', 'unknown')}: {e}")
continue
# Get or create context
if context_id:
context = AgentContext.use(context_id)
if not context:
return Response('{"error": "Context not found"}', status=404, mimetype="application/json")
else:
config = initialize_agent()
context = AgentContext(config=config, type=AgentContextType.USER)
AgentContext.use(context.id)
context_id = context.id
# Update chat lifetime
with self._cleanup_lock:
self._chat_lifetimes[context_id] = datetime.now() + timedelta(hours=lifetime_hours)
# Process message
try:
# Log the message
attachment_filenames = [os.path.basename(path) for path in attachment_paths] if attachment_paths else []
PrintStyle(
background_color="#6C3483", font_color="white", bold=True, padding=True
).print("External API message:")
PrintStyle(font_color="white", padding=False).print(f"> {message}")
if attachment_filenames:
PrintStyle(font_color="white", padding=False).print("Attachments:")
for filename in attachment_filenames:
PrintStyle(font_color="white", padding=False).print(f"- {filename}")
# Add user message to chat history so it's visible in the UI
context.log.log(
type="user",
heading="User message",
content=message,
kvps={"attachments": attachment_filenames},
)
# Send message to agent
task = context.communicate(UserMessage(message, attachment_paths))
result = await task.result()
# Clean up expired chats
self._cleanup_expired_chats()
return {
"context_id": context_id,
"response": result
}
except Exception as e:
PrintStyle.error(f"External API error: {e}")
return Response(f'{{"error": "{str(e)}"}}', status=500, mimetype="application/json")
@classmethod
def _cleanup_expired_chats(cls):
"""Clean up expired chats"""
with cls._cleanup_lock:
now = datetime.now()
expired_contexts = [
context_id for context_id, expiry in cls._chat_lifetimes.items()
if now > expiry
]
for context_id in expired_contexts:
try:
context = AgentContext.get(context_id)
if context:
context.reset()
AgentContext.remove(context_id)
del cls._chat_lifetimes[context_id]
PrintStyle().print(f"Cleaned up expired chat: {context_id}")
except Exception as e:
PrintStyle.error(f"Failed to cleanup chat {context_id}: {e}")