forked from agent0ai/agent-zero
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler_task_create.py
More file actions
163 lines (141 loc) · 6.36 KB
/
scheduler_task_create.py
File metadata and controls
163 lines (141 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from python.helpers.api import ApiHandler, Input, Output, Request
from python.helpers.task_scheduler import (
TaskScheduler, ScheduledTask, AdHocTask, PlannedTask, TaskSchedule,
serialize_task, parse_task_schedule, parse_task_plan, TaskType
)
from python.helpers.projects import load_basic_project_data
from python.helpers.localization import Localization
from python.helpers.print_style import PrintStyle
import random
class SchedulerTaskCreate(ApiHandler):
async def process(self, input: Input, request: Request) -> Output:
"""
Create a new task in the scheduler
"""
printer = PrintStyle(italic=True, font_color="blue", padding=False)
# Get timezone from input (do not set if not provided, we then rely on poll() to set it)
if timezone := input.get("timezone", None):
Localization.get().set_timezone(timezone)
scheduler = TaskScheduler.get()
await scheduler.reload()
# Get common fields from input
name = input.get("name")
system_prompt = input.get("system_prompt", "")
prompt = input.get("prompt")
attachments = input.get("attachments", [])
requested_project_slug = input.get("project_name")
if isinstance(requested_project_slug, str):
requested_project_slug = requested_project_slug.strip() or None
else:
requested_project_slug = None
project_slug = requested_project_slug
project_color = None
if project_slug:
try:
metadata = load_basic_project_data(requested_project_slug)
project_color = metadata.get("color") or None
except Exception as exc:
printer.error(f"SchedulerTaskCreate: failed to load project '{project_slug}': {exc}")
return {"error": f"Saving project failed: {project_slug}"}
# Always dedicated context for scheduler tasks created by ui
task_context_id = None
# Check if schedule is provided (for ScheduledTask)
schedule = input.get("schedule", {})
token: str = input.get("token", "")
# Debug log the token value
printer.print(f"Token received from frontend: '{token}' (type: {type(token)}, length: {len(token) if token else 0})")
# Generate a random token if empty or not provided
if not token:
token = str(random.randint(1000000000000000000, 9999999999999999999))
printer.print(f"Generated new token: '{token}'")
plan = input.get("plan", {})
# Validate required fields
if not name or not prompt:
# return {"error": "Missing required fields: name, system_prompt, prompt"}
raise ValueError("Missing required fields: name, system_prompt, prompt")
task = None
if schedule:
# Create a scheduled task
# Handle different schedule formats (string or object)
if isinstance(schedule, str):
# Parse the string schedule
parts = schedule.split(' ')
task_schedule = TaskSchedule(
minute=parts[0] if len(parts) > 0 else "*",
hour=parts[1] if len(parts) > 1 else "*",
day=parts[2] if len(parts) > 2 else "*",
month=parts[3] if len(parts) > 3 else "*",
weekday=parts[4] if len(parts) > 4 else "*"
)
elif isinstance(schedule, dict):
# Use our standardized parsing function
try:
task_schedule = parse_task_schedule(schedule)
except ValueError as e:
raise ValueError(str(e))
else:
raise ValueError("Invalid schedule format. Must be string or object.")
task = ScheduledTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
schedule=task_schedule,
attachments=attachments,
context_id=task_context_id,
timezone=timezone,
project_name=project_slug,
project_color=project_color,
)
elif plan:
# Create a planned task
try:
# Use our standardized parsing function
task_plan = parse_task_plan(plan)
except ValueError as e:
return {"error": str(e)}
task = PlannedTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
plan=task_plan,
attachments=attachments,
context_id=task_context_id,
project_name=project_slug,
project_color=project_color,
)
else:
# Create an ad-hoc task
printer.print(f"Creating AdHocTask with token: '{token}'")
task = AdHocTask.create(
name=name,
system_prompt=system_prompt,
prompt=prompt,
token=token,
attachments=attachments,
context_id=task_context_id,
project_name=project_slug,
project_color=project_color,
)
# Verify token after creation
if isinstance(task, AdHocTask):
printer.print(f"AdHocTask created with token: '{task.token}'")
# Add the task to the scheduler
await scheduler.add_task(task)
# Verify the task was added correctly - retrieve by UUID to check persistence
saved_task = scheduler.get_task_by_uuid(task.uuid)
if saved_task:
if saved_task.type == TaskType.AD_HOC and isinstance(saved_task, AdHocTask):
printer.print(f"Task verified after save, token: '{saved_task.token}'")
else:
printer.print("Task verified after save, not an adhoc task")
else:
printer.print("WARNING: Task not found after save!")
# Return the created task using our standardized serialization function
task_dict = serialize_task(task)
# Debug log the serialized task
if task_dict and task_dict.get('type') == 'adhoc':
printer.print(f"Serialized adhoc task, token in response: '{task_dict.get('token')}'")
return {
"ok": True,
"task": task_dict
}