Quickstart (Python)

How to use OpenAI Computer Use with Steel

This guide will walk you through how to use OpenAI's computer-use-previewmodel with Steel's managed remote browsers to create AI agents that can navigate the web.

Weโ€™ll be implementing a simple CUA loop that functions as described below:

Computer use - OpenAI API

Prerequisites

  • Python 3.8+

  • A Steel API key (sign up here)

  • An OpenAI API key with access to the

    computer-use-preview

    model

Step 1: Setup and Helper Functions

Python
utils.py
import os
import time
import base64
import json
import re
from typing import List, Dict
from urllib.parse import urlparse
import requests
from dotenv import load_dotenv
from PIL import Image
from io import BytesIO
load_dotenv(override=True)
# Replace with your own API keys
STEEL_API_KEY = os.getenv("STEEL_API_KEY") or "your-steel-api-key-here"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or "your-openai-api-key-here"
# Replace with your own task
TASK = os.getenv("TASK") or "Go to Wikipedia and search for machine learning"
SYSTEM_PROMPT = """You are an expert browser automation assistant operating in an iterative execution loop. Your goal is to efficiently complete tasks using a Chrome browser with full internet access.
<CAPABILITIES>
* You control a Chrome browser tab and can navigate to any website
* You can click, type, scroll, take screenshots, and interact with web elements
* You have full internet access and can visit any public website
* You can read content, fill forms, search for information, and perform complex multi-step tasks
* After each action, you receive a screenshot showing the current state
* Use the goto(url) function to navigate directly to URLs - DO NOT try to click address bars or browser UI
* Use the back() function to go back to the previous page
<COORDINATE_SYSTEM>
* The browser viewport has specific dimensions that you must respect
* All coordinates (x, y) must be within the viewport bounds
* X coordinates must be between 0 and the display width (inclusive)
* Y coordinates must be between 0 and the display height (inclusive)
* Always ensure your click, move, scroll, and drag coordinates are within these bounds
* If you're unsure about element locations, take a screenshot first to see the current state
<AUTONOMOUS_EXECUTION>
* Work completely independently - make decisions and act immediately without asking questions
* Never request clarification, present options, or ask for permission
* Make intelligent assumptions based on task context
* If something is ambiguous, choose the most logical interpretation and proceed
* Take immediate action rather than explaining what you might do
* When the task objective is achieved, immediately declare "TASK_COMPLETED:" - do not provide commentary or ask questions
<REASONING_STRUCTURE>
For each step, you must reason systematically:
* Analyze your previous action's success/failure and current state
* Identify what specific progress has been made toward the goal
* Determine the next immediate objective and how to achieve it
* Choose the most efficient action sequence to make progress
<EFFICIENCY_PRINCIPLES>
* Combine related actions when possible rather than single-step execution
* Navigate directly to relevant websites without unnecessary exploration
* Use screenshots strategically to understand page state before acting
* Be persistent with alternative approaches if initial attempts fail
* Focus on the specific information or outcome requested
<COMPLETION_CRITERIA>
* MANDATORY: When you complete the task, your final message MUST start with "TASK_COMPLETED: [brief summary]"
* MANDATORY: If technical issues prevent completion, your final message MUST start with "TASK_FAILED: [reason]"
* MANDATORY: If you abandon the task, your final message MUST start with "TASK_ABANDONED: [explanation]"
* Do not write anything after completing the task except the required completion message
* Do not ask questions, provide commentary, or offer additional help after task completion
* The completion message is the end of the interaction - nothing else should follow
<CRITICAL_REQUIREMENTS>
* This is fully automated execution - work completely independently
* Start by taking a screenshot to understand the current state
* Use goto(url) function for navigation - never click on browser UI elements
* Always respect coordinate boundaries - invalid coordinates will fail
* Recognize when the stated objective has been achieved and declare completion immediately
* Focus on the explicit task given, not implied or potential follow-up tasks
Remember: Be thorough but focused. Complete the specific task requested efficiently and provide clear results."""
BLOCKED_DOMAINS = [
"maliciousbook.com",
"evilvideos.com",
"darkwebforum.com",
"shadytok.com",
"suspiciouspins.com",
"ilanbigio.com",
]
CUA_KEY_TO_PLAYWRIGHT_KEY = {
"/": "Divide",
"\\": "Backslash",
"alt": "Alt",
"arrowdown": "ArrowDown",
"arrowleft": "ArrowLeft",
"arrowright": "ArrowRight",
"arrowup": "ArrowUp",
"backspace": "Backspace",
"capslock": "CapsLock",
"cmd": "Meta",
"ctrl": "Control",
"delete": "Delete",
"end": "End",
"enter": "Enter",
"esc": "Escape",
"home": "Home",
"insert": "Insert",
"option": "Alt",
"pagedown": "PageDown",
"pageup": "PageUp",
"shift": "Shift",
"space": " ",
"super": "Meta",
"tab": "Tab",
"win": "Meta",
}
def pp(obj):
print(json.dumps(obj, indent=4))
def show_image(base_64_image):
image_data = base64.b64decode(base_64_image)
image = Image.open(BytesIO(image_data))
image.show()
def sanitize_message(msg: dict) -> dict:
"""Return a copy of the message with image_url omitted for computer_call_output messages."""
if msg.get("type") == "computer_call_output":
output = msg.get("output", {})
if isinstance(output, dict):
sanitized = msg.copy()
sanitized["output"] = {**output, "image_url": "[omitted]"}
return sanitized
return msg
def create_response(**kwargs):
url = "https://api.openai.com/v1/responses"
headers = {
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"Content-Type": "application/json"
}
openai_org = os.getenv("OPENAI_ORG")
if openai_org:
headers["Openai-Organization"] = openai_org
response = requests.post(url, headers=headers, json=kwargs)
if response.status_code != 200:
print(f"Error: {response.status_code} {response.text}")
return response.json()
def check_blocklisted_url(url: str) -> None:
"""Raise ValueError if the given URL (including subdomains) is in the blocklist."""
hostname = urlparse(url).hostname or ""
if any(
hostname == blocked or hostname.endswith(f".{blocked}")
for blocked in BLOCKED_DOMAINS
):
raise ValueError(f"Blocked URL: {url}")

Step 2: Create Steel Browser Integration

Python
steel_browser.py
1
class SteelBrowser:
2
def __init__(
3
self,
4
width: int = 1024,
5
height: int = 768,
6
proxy: bool = False,
7
solve_captcha: bool = False,
8
virtual_mouse: bool = True,
9
session_timeout: int = 900000, # 15 minutes
10
ad_blocker: bool = True,
11
start_url: str = "https://www.google.com",
12
):
13
self.client = Steel(
14
steel_api_key=os.getenv("STEEL_API_KEY"),
15
)
16
self.dimensions = (width, height)
17
self.proxy = proxy
18
self.solve_captcha = solve_captcha
19
self.virtual_mouse = virtual_mouse
20
self.session_timeout = session_timeout
21
self.ad_blocker = ad_blocker
22
self.start_url = start_url
23
self.session = None
24
self._playwright = None
25
self._browser = None
26
self._page = None
27
28
def get_environment(self):
29
return "browser"
30
31
def get_dimensions(self):
32
return self.dimensions
33
34
def get_current_url(self) -> str:
35
return self._page.url if self._page else ""
36
37
def __enter__(self):
38
"""Enter context manager - create Steel session and connect browser."""
39
width, height = self.dimensions
40
session_params = {
41
"use_proxy": self.proxy,
42
"solve_captcha": self.solve_captcha,
43
"api_timeout": self.session_timeout,
44
"block_ads": self.ad_blocker,
45
"dimensions": {"width": width, "height": height}
46
}
47
self.session = self.client.sessions.create(**session_params)
48
49
print("Steel Session created successfully!")
50
print(f"View live session at: {self.session.session_viewer_url}")
51
52
self._playwright = sync_playwright().start()
53
browser = self._playwright.chromium.connect_over_cdp(
54
f"{self.session.websocket_url}&apiKey={os.getenv('STEEL_API_KEY')}",
55
timeout=60000
56
)
57
self._browser = browser
58
context = browser.contexts[0]
59
60
def handle_route(route, request):
61
url = request.url
62
try:
63
check_blocklisted_url(url)
64
route.continue_()
65
except ValueError:
66
print(f"Blocking URL: {url}")
67
route.abort()
68
69
if self.virtual_mouse:
70
context.add_init_script("""
71
if (window.self === window.top) {
72
function initCursor() {
73
const CURSOR_ID = '__cursor__';
74
if (document.getElementById(CURSOR_ID)) return;
75
76
const cursor = document.createElement('div');
77
cursor.id = CURSOR_ID;
78
Object.assign(cursor.style, {
79
position: 'fixed',
80
top: '0px',
81
left: '0px',
82
width: '20px',
83
height: '20px',
84
backgroundImage: 'url("data:image/svg+xml;utf8,<svg width=\\'16\\' height=\\'16\\' viewBox=\\'0 0 20 20\\' fill=\\'black\\' outline=\\'white\\' xmlns=\\'http://www.w3.org/2000/svg\\'><path d=\\'M15.8089 7.22221C15.9333 7.00888 15.9911 6.78221 15.9822 6.54221C15.9733 6.29333 15.8978 6.06667 15.7555 5.86221C15.6133 5.66667 15.4311 5.52445 15.2089 5.43555L1.70222 0.0888888C1.47111 0 1.23555 -0.0222222 0.995555 0.0222222C0.746667 0.0755555 0.537779 0.186667 0.368888 0.355555C0.191111 0.533333 0.0755555 0.746667 0.0222222 0.995555C-0.0222222 1.23555 0 1.47111 0.0888888 1.70222L5.43555 15.2222C5.52445 15.4445 5.66667 15.6267 5.86221 15.7689C6.06667 15.9111 6.28888 15.9867 6.52888 15.9955H6.58221C6.82221 15.9955 7.04445 15.9333 7.24888 15.8089C7.44445 15.6845 7.59555 15.52 7.70221 15.3155L10.2089 10.2222L15.3022 7.70221C15.5155 7.59555 15.6845 7.43555 15.8089 7.22221Z\\' ></path></svg>")',
85
backgroundSize: 'cover',
86
pointerEvents: 'none',
87
zIndex: '99999',
88
transform: 'translate(-2px, -2px)',
89
});
90
91
document.body.appendChild(cursor);
92
93
document.addEventListener("mousemove", (e) => {
94
cursor.style.top = e.clientY + "px";
95
cursor.style.left = e.clientX + "px";
96
});
97
}
98
99
requestAnimationFrame(function checkBody() {
100
if (document.body) {
101
initCursor();
102
} else {
103
requestAnimationFrame(checkBody);
104
}
105
});
106
}
107
""")
108
109
self._page = context.pages[0]
110
self._page.route("**/*", handle_route)
111
112
self._page.set_viewport_size({"width": width, "height": height})
113
114
self._page.goto(self.start_url)
115
116
return self
117
118
def __exit__(self, exc_type, exc_val, exc_tb):
119
if self._page:
120
self._page.close()
121
if self._browser:
122
self._browser.close()
123
if self._playwright:
124
self._playwright.stop()
125
126
if self.session:
127
print("Releasing Steel session...")
128
self.client.sessions.release(self.session.id)
129
print(f"Session completed. View replay at {self.session.session_viewer_url}")
130
131
def screenshot(self) -> str:
132
"""Take a screenshot using Playwright for consistent viewport sizing."""
133
try:
134
width, height = self.dimensions
135
png_bytes = self._page.screenshot(
136
full_page=False,
137
clip={"x": 0, "y": 0, "width": width, "height": height}
138
)
139
return base64.b64encode(png_bytes).decode("utf-8")
140
except PlaywrightError as error:
141
print(f"Screenshot failed, trying CDP fallback: {error}")
142
try:
143
cdp_session = self._page.context.new_cdp_session(self._page)
144
result = cdp_session.send(
145
"Page.captureScreenshot", {"format": "png", "fromSurface": False}
146
)
147
return result["data"]
148
except PlaywrightError as cdp_error:
149
print(f"CDP screenshot also failed: {cdp_error}")
150
raise error
151
152
def click(self, x: int, y: int, button: str = "left") -> None:
153
if button == "back":
154
self.back()
155
elif button == "forward":
156
self.forward()
157
elif button == "wheel":
158
self._page.mouse.wheel(x, y)
159
else:
160
button_type = {"left": "left", "right": "right"}.get(button, "left")
161
self._page.mouse.click(x, y, button=button_type)
162
163
def double_click(self, x: int, y: int) -> None:
164
self._page.mouse.dblclick(x, y)
165
166
def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
167
self._page.mouse.move(x, y)
168
self._page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})")
169
170
def type(self, text: str) -> None:
171
self._page.keyboard.type(text)
172
173
def wait(self, ms: int = 1000) -> None:
174
time.sleep(ms / 1000)
175
176
def move(self, x: int, y: int) -> None:
177
self._page.mouse.move(x, y)
178
179
def keypress(self, keys: List[str]) -> None:
180
"""Press keys (supports modifier combinations)."""
181
mapped_keys = [CUA_KEY_TO_PLAYWRIGHT_KEY.get(key.lower(), key) for key in keys]
182
for key in mapped_keys:
183
self._page.keyboard.down(key)
184
for key in reversed(mapped_keys):
185
self._page.keyboard.up(key)
186
187
def drag(self, path: List[Dict[str, int]]) -> None:
188
if not path:
189
return
190
start_x, start_y = path[0]["x"], path[0]["y"]
191
self._page.mouse.move(start_x, start_y)
192
self._page.mouse.down()
193
for point in path[1:]:
194
scaled_x, scaled_y = point["x"], point["y"]
195
self._page.mouse.move(scaled_x, scaled_y)
196
self._page.mouse.up()
197
198
def goto(self, url: str) -> None:
199
try:
200
self._page.goto(url)
201
except Exception as e:
202
print(f"Error navigating to {url}: {e}")
203
204
def back(self) -> None:
205
self._page.go_back()
206
207
def forward(self) -> None:
208
self._page.go_forward()

Step 3: Create the Agent Class

Python
agent.py
1
class Agent:
2
def __init__(
3
self,
4
model: str = "computer-use-preview",
5
computer = None,
6
tools: List[dict] = None,
7
auto_acknowledge_safety: bool = True,
8
):
9
self.model = model
10
self.computer = computer
11
self.tools = tools or []
12
self.auto_acknowledge_safety = auto_acknowledge_safety
13
self.print_steps = True
14
self.debug = False
15
self.show_images = False
16
17
if computer:
18
scaled_width, scaled_height = computer.get_dimensions()
19
self.viewport_width = scaled_width
20
self.viewport_height = scaled_height
21
22
# Create dynamic system prompt with viewport dimensions
23
self.system_prompt = SYSTEM_PROMPT.replace(
24
'<COORDINATE_SYSTEM>',
25
f'<COORDINATE_SYSTEM>\n* The browser viewport dimensions are {scaled_width}x{scaled_height} pixels\n* The browser viewport has specific dimensions that you must respect'
26
)
27
28
self.tools.append({
29
"type": "computer-preview",
30
"display_width": scaled_width,
31
"display_height": scaled_height,
32
"environment": computer.get_environment(),
33
})
34
35
# Add goto function tool for direct URL navigation
36
self.tools.append({
37
"type": "function",
38
"name": "goto",
39
"description": "Navigate directly to a specific URL.",
40
"parameters": {
41
"type": "object",
42
"properties": {
43
"url": {
44
"type": "string",
45
"description": "Fully qualified URL to navigate to (e.g., https://example.com).",
46
},
47
},
48
"additionalProperties": False,
49
"required": ["url"],
50
},
51
})
52
53
# Add back function tool for browser navigation
54
self.tools.append({
55
"type": "function",
56
"name": "back",
57
"description": "Go back to the previous page.",
58
"parameters": {},
59
})
60
else:
61
self.viewport_width = 1024
62
self.viewport_height = 768
63
self.system_prompt = SYSTEM_PROMPT
64
65
def debug_print(self, *args):
66
if self.debug:
67
pp(*args)
68
69
def get_viewport_info(self) -> dict:
70
"""Get detailed viewport information for debugging."""
71
if not self.computer or not self.computer._page:
72
return {}
73
74
try:
75
return self.computer._page.evaluate("""
76
() => ({
77
innerWidth: window.innerWidth,
78
innerHeight: window.innerHeight,
79
devicePixelRatio: window.devicePixelRatio,
80
screenWidth: window.screen.width,
81
screenHeight: window.screen.height,
82
scrollX: window.scrollX,
83
scrollY: window.scrollY
84
})
85
""")
86
except:
87
return {}
88
89
def validate_screenshot_dimensions(self, screenshot_base64: str) -> dict:
90
"""Validate screenshot dimensions against viewport."""
91
try:
92
image_data = base64.b64decode(screenshot_base64)
93
image = Image.open(BytesIO(image_data))
94
screenshot_width, screenshot_height = image.size
95
96
viewport_info = self.get_viewport_info()
97
98
scaling_info = {
99
"screenshot_size": (screenshot_width, screenshot_height),
100
"viewport_size": (self.viewport_width, self.viewport_height),
101
"actual_viewport": (viewport_info.get('innerWidth', 0), viewport_info.get('innerHeight', 0)),
102
"device_pixel_ratio": viewport_info.get('devicePixelRatio', 1.0),
103
"width_scale": screenshot_width / self.viewport_width if self.viewport_width > 0 else 1.0,
104
"height_scale": screenshot_height / self.viewport_height if self.viewport_height > 0 else 1.0
105
}
106
107
# Warn about scaling mismatches
108
if scaling_info["width_scale"] != 1.0 or scaling_info["height_scale"] != 1.0:
109
print(f"โš ๏ธ Screenshot scaling detected:")
110
print(f" Screenshot: {screenshot_width}x{screenshot_height}")
111
print(f" Expected viewport: {self.viewport_width}x{self.viewport_height}")
112
print(f" Actual viewport: {viewport_info.get('innerWidth', 'unknown')}x{viewport_info.get('innerHeight', 'unknown')}")
113
print(f" Scale factors: {scaling_info['width_scale']:.3f}x{scaling_info['height_scale']:.3f}")
114
115
return scaling_info
116
except Exception as e:
117
print(f"โš ๏ธ Error validating screenshot dimensions: {e}")
118
return {}
119
120
def validate_coordinates(self, action_args: dict) -> dict:
121
"""Validate coordinates without clamping."""
122
validated_args = action_args.copy()
123
124
# Handle single coordinates (click, move, etc.)
125
if 'x' in action_args and 'y' in action_args:
126
validated_args['x'] = int(float(action_args['x']))
127
validated_args['y'] = int(float(action_args['y']))
128
129
# Handle path arrays (drag)
130
if 'path' in action_args and isinstance(action_args['path'], list):
131
validated_path = []
132
for point in action_args['path']:
133
validated_path.append({
134
'x': int(float(point.get('x', 0))),
135
'y': int(float(point.get('y', 0)))
136
})
137
validated_args['path'] = validated_path
138
139
return validated_args
140
141
def handle_item(self, item):
142
"""Handle each item from OpenAI response."""
143
if item["type"] == "message":
144
if self.print_steps:
145
print(item["content"][0]["text"])
146
147
elif item["type"] == "function_call":
148
name, args = item["name"], json.loads(item["arguments"])
149
if self.print_steps:
150
print(f"{name}({args})")
151
152
if hasattr(self.computer, name):
153
method = getattr(self.computer, name)
154
method(**args)
155
156
return [{
157
"type": "function_call_output",
158
"call_id": item["call_id"],
159
"output": "success",
160
}]
161
162
elif item["type"] == "computer_call":
163
action = item["action"]
164
action_type = action["type"]
165
action_args = {k: v for k, v in action.items() if k != "type"}
166
167
# Validate coordinates and log any issues
168
validated_args = self.validate_coordinates(action_args)
169
170
if self.print_steps:
171
print(f"{action_type}({validated_args})")
172
173
method = getattr(self.computer, action_type)
174
method(**validated_args)
175
176
screenshot_base64 = self.computer.screenshot()
177
178
# Validate screenshot dimensions for debugging
179
if action_type == "screenshot" or self.debug:
180
self.validate_screenshot_dimensions(screenshot_base64)
181
182
if self.show_images:
183
show_image(screenshot_base64)
184
185
pending_checks = item.get("pending_safety_checks", [])
186
for check in pending_checks:
187
message = check["message"]
188
if self.auto_acknowledge_safety:
189
print(f"โš ๏ธ Auto-acknowledging safety check: {message}")
190
else:
191
raise ValueError(f"Safety check failed: {message}")
192
193
call_output = {
194
"type": "computer_call_output",
195
"call_id": item["call_id"],
196
"acknowledged_safety_checks": pending_checks,
197
"output": {
198
"type": "input_image",
199
"image_url": f"data:image/png;base64,{screenshot_base64}",
200
},
201
}
202
203
if self.computer.get_environment() == "browser":
204
current_url = self.computer.get_current_url()
205
check_blocklisted_url(current_url)
206
call_output["output"]["current_url"] = current_url
207
208
return [call_output]
209
210
return []
211
212
def execute_task(
213
self,
214
task: str,
215
print_steps: bool = True,
216
debug: bool = False,
217
max_iterations: int = 50
218
) -> str:
219
self.print_steps = print_steps
220
self.debug = debug
221
self.show_images = False
222
223
input_items = [
224
{
225
"role": "system",
226
"content": self.system_prompt,
227
},
228
{
229
"role": "user",
230
"content": task,
231
},
232
]
233
234
new_items = []
235
iterations = 0
236
consecutive_no_actions = 0
237
last_assistant_messages = []
238
239
print(f"๐ŸŽฏ Executing task: {task}")
240
print("=" * 60)
241
242
def is_task_complete(content: str) -> dict:
243
"""Check if the task is complete based on content patterns."""
244
245
# Explicit completion markers
246
if "TASK_COMPLETED:" in content:
247
return {"completed": True, "reason": "explicit_completion"}
248
if "TASK_FAILED:" in content or "TASK_ABANDONED:" in content:
249
return {"completed": True, "reason": "explicit_failure"}
250
251
# Natural completion patterns
252
completion_patterns = [
253
r'task\s+(completed|finished|done|accomplished)',
254
r'successfully\s+(completed|finished|found|gathered)',
255
r'here\s+(is|are)\s+the\s+(results?|information|summary)',
256
r'to\s+summarize',
257
r'in\s+conclusion',
258
r'final\s+(answer|result|summary)'
259
]
260
261
# Failure/abandonment patterns
262
failure_patterns = [
263
r'cannot\s+(complete|proceed|access|continue)',
264
r'unable\s+to\s+(complete|access|find|proceed)',
265
r'blocked\s+by\s+(captcha|security|authentication)',
266
r'giving\s+up',
267
r'no\s+longer\s+able',
268
r'have\s+tried\s+multiple\s+approaches'
269
]
270
271
for pattern in completion_patterns:
272
if re.search(pattern, content, re.IGNORECASE):
273
return {"completed": True, "reason": "natural_completion"}
274
275
for pattern in failure_patterns:
276
if re.search(pattern, content, re.IGNORECASE):
277
return {"completed": True, "reason": "natural_failure"}
278
279
return {"completed": False}
280
281
def detect_repetition(new_message: str) -> bool:
282
"""Detect if the message is too similar to recent messages."""
283
if len(last_assistant_messages) < 2:
284
return False
285
286
def similarity(str1: str, str2: str) -> float:
287
words1 = str1.lower().split()
288
words2 = str2.lower().split()
289
common_words = [word for word in words1 if word in words2]
290
return len(common_words) / max(len(words1), len(words2))
291
292
return any(similarity(new_message, prev_message) > 0.8
293
for prev_message in last_assistant_messages)
294
295
while iterations < max_iterations:
296
iterations += 1
297
has_actions = False
298
299
if new_items and new_items[-1].get("role") == "assistant":
300
last_message = new_items[-1]
301
if last_message.get("content") and len(last_message["content"]) > 0:
302
content = last_message["content"][0].get("text", "")
303
304
# Check for explicit completion
305
completion = is_task_complete(content)
306
if completion["completed"]:
307
print(f"โœ… Task completed ({completion['reason']})")
308
break
309
310
# Check for repetition
311
if detect_repetition(content):
312
print("๐Ÿ”„ Repetition detected - stopping execution")
313
last_assistant_messages.append(content)
314
break
315
316
# Track assistant messages for repetition detection
317
last_assistant_messages.append(content)
318
if len(last_assistant_messages) > 3:
319
last_assistant_messages.pop(0) # Keep only last 3
320
321
self.debug_print([sanitize_message(msg) for msg in input_items + new_items])
322
323
try:
324
response = create_response(
325
model=self.model,
326
input=input_items + new_items,
327
tools=self.tools,
328
truncation="auto",
329
)
330
self.debug_print(response)
331
332
if "output" not in response:
333
if self.debug:
334
print(response)
335
raise ValueError("No output from model")
336
337
new_items += response["output"]
338
339
# Check if this iteration had any actions
340
for item in response["output"]:
341
if item.get("type") in ["computer_call", "function_call"]:
342
has_actions = True
343
new_items += self.handle_item(item)
344
345
# Track consecutive iterations without actions
346
if not has_actions:
347
consecutive_no_actions += 1
348
if consecutive_no_actions >= 3:
349
print("โš ๏ธ No actions for 3 consecutive iterations - stopping")
350
break
351
else:
352
consecutive_no_actions = 0
353
354
except Exception as error:
355
print(f"โŒ Error during task execution: {error}")
356
raise error
357
358
if iterations >= max_iterations:
359
print(f"โš ๏ธ Task execution stopped after {max_iterations} iterations")
360
361
assistant_messages = [item for item in new_items if item.get("role") == "assistant"]
362
if assistant_messages:
363
final_message = assistant_messages[-1]
364
if final_message.get("content") and len(final_message["content"]) > 0:
365
return final_message["content"][0].get("text", "Task execution completed (no final message)")
366
367
return "Task execution completed (no final message)"

Step 4: Create the Main Script

Python
main.py
1
def main():
2
print("๐Ÿš€ Steel + OpenAI Computer Use Assistant")
3
print("=" * 60)
4
5
if STEEL_API_KEY == "your-steel-api-key-here":
6
print("โš ๏ธ WARNING: Please replace 'your-steel-api-key-here' with your actual Steel API key")
7
print(" Get your API key at: https://app.steel.dev/settings/api-keys")
8
return
9
10
if OPENAI_API_KEY == "your-openai-api-key-here":
11
print("โš ๏ธ WARNING: Please replace 'your-openai-api-key-here' with your actual OpenAI API key")
12
print(" Get your API key at: https://platform.openai.com/")
13
return
14
15
task = os.getenv("TASK") or TASK
16
17
print("\nStarting Steel browser session...")
18
19
try:
20
with SteelBrowser() as computer:
21
print("โœ… Steel browser session started!")
22
23
agent = Agent(
24
computer=computer,
25
auto_acknowledge_safety=True,
26
)
27
28
start_time = time.time()
29
30
try:
31
result = agent.execute_task(
32
task,
33
print_steps=True,
34
debug=False,
35
max_iterations=50,
36
)
37
38
duration = f"{(time.time() - start_time):.1f}"
39
40
print("\n" + "=" * 60)
41
print("๐ŸŽ‰ TASK EXECUTION COMPLETED")
42
print("=" * 60)
43
print(f"โฑ๏ธ Duration: {duration} seconds")
44
print(f"๐ŸŽฏ Task: {task}")
45
print(f"๐Ÿ“‹ Result:\n{result}")
46
print("=" * 60)
47
48
except Exception as error:
49
print(f"โŒ Task execution failed: {error}")
50
exit(1)
51
52
except Exception as e:
53
print(f"โŒ Failed to start Steel browser: {e}")
54
print("Please check your STEEL_API_KEY and internet connection.")
55
exit(1)
56
57
58
if __name__ == "__main__":
59
main()

Running Your Agent

Execute your script to start an interactive AI browser session:

You will see the session URL printed in the console. You can view the live browser session by opening this URL in your web browser.

The agent will execute the task defined in the TASK environment variable or the default task. You can modify the task by setting the environment variable:

Terminal
export TASK="Search for the latest news on artificial intelligence"
python main.py

Next Steps

  • Explore the Steel API documentation for more advanced features

  • Check out the OpenAI documentation for more information about the computer-use-preview model

  • Add additional features like session recording or multi-session management