Skip to main content

Install and initialize

pip install skyvern
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key="YOUR_API_KEY")
    result = await client.run_task(
        prompt="Get the title of the top post on Hacker News",
        url="https://news.ycombinator.com",
        wait_for_completion=True,
    )
    print(result.output)

asyncio.run(main())
Constructor:
Skyvern(
    api_key: str,                          # Required
    base_url: str | None = None,           # Override for self-hosted deployments
    environment: SkyvernEnvironment = CLOUD,# CLOUD or LOCAL
    timeout: float | None = None,          # HTTP request timeout (seconds)
)
Local mode (runs entirely on your machine, reads .env):
client = Skyvern.local()
All methods are async. Use await inside async def, with asyncio.run() as entry point.

Imports

from skyvern import Skyvern
from skyvern.client import SkyvernEnvironment              # CLOUD, LOCAL
from skyvern.client.core import ApiError, RequestOptions   # Base error, per-request config
from skyvern.client.errors import (                        # HTTP error subclasses
    BadRequestError,       # 400
    ForbiddenError,        # 403
    NotFoundError,         # 404
    ConflictError,         # 409
    UnprocessableEntityError,  # 422
)
from skyvern.schemas.runs import RunEngine                 # skyvern_v1, skyvern_v2, openai_cua, anthropic_cua, ui_tars
from skyvern.schemas.run_blocks import CredentialType      # skyvern, bitwarden, onepassword, azure_vault
Important: ApiError lives in skyvern.client.core, not skyvern.client.errors. The subclasses (NotFoundError, etc.) live in skyvern.client.errors.

Tasks

run_task

result = await client.run_task(
    prompt: str,                                # Required. Natural language instructions.
    url: str | None = None,                     # Starting page URL.
    engine: RunEngine = RunEngine.skyvern_v2,   # AI engine.
    wait_for_completion: bool = False,          # Block until finished.
    timeout: float = 1800,                      # Max wait (seconds). Only with wait_for_completion.
    max_steps: int | None = None,               # Cap AI steps to limit cost.
    data_extraction_schema: dict | str | None = None,  # JSON Schema constraining output shape.
    browser_session_id: str | None = None,      # Run in existing session.
    publish_workflow: bool = False,             # Save generated code as reusable workflow.
    proxy_location: ProxyLocation | None = None,
    webhook_url: str | None = None,
    error_code_mapping: dict[str, str] | None = None,
    totp_identifier: str | None = None,
    totp_url: str | None = None,
    title: str | None = None,
    user_agent: str | None = None,
    extra_http_headers: dict[str, str] | None = None,
    browser_address: str | None = None,
) -> TaskRunResponse
TaskRunResponse fields:
FieldTypeDescription
run_idstrUnique ID (tsk_...).
statusstrcreated | queued | running | completed | failed | terminated | timed_out | canceled
outputdict | list | NoneExtracted data. None until completed.
failure_reasonstr | NoneError description if failed.
downloaded_fileslist[FileInfo] | NoneFiles downloaded during the run.
recording_urlstr | NoneSession recording video URL.
screenshot_urlslist[str] | NoneFinal screenshots.
app_urlstr | NoneLink to run in Skyvern UI.
step_countint | NoneNumber of AI steps taken.
created_atdatetimeWhen the run was created.
finished_atdatetime | NoneWhen the run finished.

get_run

run = await client.get_run(run_id: str) -> GetRunResponse
Returns status and results for any run (task or workflow).

cancel_run

await client.cancel_run(run_id: str)

get_run_timeline

timeline = await client.get_run_timeline(run_id: str) -> list[WorkflowRunTimeline]

get_run_artifacts

artifacts = await client.get_run_artifacts(
    run_id: str,
    artifact_type: ArtifactType | list[ArtifactType] | None = None,
) -> list[Artifact]

get_artifact

artifact = await client.get_artifact(artifact_id: str) -> Artifact

retry_run_webhook

await client.retry_run_webhook(run_id: str)

Workflows

run_workflow

result = await client.run_workflow(
    workflow_id: str,                       # Required. Permanent ID (wpid_...).
    parameters: dict | None = None,         # Input params matching workflow definition.
    wait_for_completion: bool = False,
    timeout: float = 1800,
    run_with: str | None = None,            # "code" (cached Playwright) or "agent" (AI).
    ai_fallback: bool | None = None,        # Fall back to AI if code fails.
    browser_session_id: str | None = None,
    browser_profile_id: str | None = None,  # Load saved browser state.
    proxy_location: ProxyLocation | None = None,
    max_steps_override: int | None = None,
    webhook_url: str | None = None,
    title: str | None = None,
    template: bool | None = None,
    totp_identifier: str | None = None,
    totp_url: str | None = None,
    user_agent: str | None = None,
    extra_http_headers: dict[str, str] | None = None,
    browser_address: str | None = None,
) -> WorkflowRunResponse
WorkflowRunResponse fields: Same as TaskRunResponse plus run_with, ai_fallback, script_run.

create_workflow

workflow = await client.create_workflow(
    json_definition: dict | None = None,
    yaml_definition: str | None = None,
    folder_id: str | None = None,
) -> Workflow
Provide either json_definition or yaml_definition. Example:
workflow = await client.create_workflow(
    json_definition={
        "title": "Extract Products",
        "workflow_definition": {
            "parameters": [
                {
                    "key": "target_url",
                    "parameter_type": "workflow",
                    "workflow_parameter_type": "string",
                    "description": "URL to scrape",
                }
            ],
            "blocks": [
                {
                    "block_type": "task",
                    "label": "extract",
                    "prompt": "Extract the top 3 products with name and price",
                    "url": "{{ target_url }}",
                }
            ],
        },
    },
)
print(workflow.workflow_permanent_id)  # wpid_... — use this to run it
Workflow fields: workflow_id, workflow_permanent_id, version, title, workflow_definition, status, created_at

get_workflows

workflows = await client.get_workflows(
    page: int | None = None,
    page_size: int | None = None,
    only_saved_tasks: bool | None = None,
    only_workflows: bool | None = None,
    title: str | None = None,
    search_key: str | None = None,
    folder_id: str | None = None,
    status: WorkflowStatus | list[WorkflowStatus] | None = None,
) -> list[Workflow]

get_workflow

workflow = await client.get_workflow(
    workflow_permanent_id: str,
    version: int | None = None,
    template: bool | None = None,
) -> Workflow

get_workflow_versions

versions = await client.get_workflow_versions(
    workflow_permanent_id: str,
    template: bool | None = None,
) -> list[Workflow]

update_workflow

updated = await client.update_workflow(
    workflow_id: str,          # The permanent ID (wpid_...).
    json_definition: dict | None = None,
    yaml_definition: str | None = None,
) -> Workflow

delete_workflow

await client.delete_workflow(workflow_id: str)

Browser sessions

A persistent browser instance that stays alive between API calls. Use to chain tasks without losing cookies or login state.

create_browser_session

session = await client.create_browser_session(
    timeout: int | None = 60,               # Minutes (5-1440).
    proxy_location: ProxyLocation | None = None,
    extensions: list[Extensions] | None = None,      # "ad-blocker", "captcha-solver"
    browser_type: PersistentBrowserType | None = None, # "chrome", "msedge"
) -> BrowserSessionResponse
BrowserSessionResponse fields: browser_session_id, status, browser_address, app_url, timeout, started_at, created_at

get_browser_session

session = await client.get_browser_session(browser_session_id: str) -> BrowserSessionResponse

get_browser_sessions

sessions = await client.get_browser_sessions() -> list[BrowserSessionResponse]

close_browser_session

await client.close_browser_session(browser_session_id: str)

Session chaining example

session = await client.create_browser_session()

# Step 1: Log in
await client.run_task(
    prompt="Log in with username demo@example.com",
    url="https://app.example.com/login",
    browser_session_id=session.browser_session_id,
    wait_for_completion=True,
)

# Step 2: Extract data (same browser, already logged in)
result = await client.run_task(
    prompt="Go to the invoices page and extract all invoice numbers",
    browser_session_id=session.browser_session_id,
    wait_for_completion=True,
)
print(result.output)

# Clean up
await client.close_browser_session(session.browser_session_id)

Browser profiles

A saved snapshot of browser state (cookies, local storage). Persists indefinitely. Create from a completed workflow run, then reuse to skip login.

create_browser_profile

profile = await client.create_browser_profile(
    name: str,
    description: str | None = None,
    workflow_run_id: str | None = None,    # Run must have used persist_browser_session=True.
    browser_session_id: str | None = None, # One of workflow_run_id or browser_session_id is required.
) -> BrowserProfile
BrowserProfile fields: browser_profile_id, name, description, created_at

list_browser_profiles

profiles = await client.list_browser_profiles(
    include_deleted: bool | None = None,
) -> list[BrowserProfile]

get_browser_profile

profile = await client.get_browser_profile(profile_id: str) -> BrowserProfile

delete_browser_profile

await client.delete_browser_profile(profile_id: str)

Profile workflow example

# Step 1: Run a login workflow
run = await client.run_workflow(
    workflow_id="wpid_login_flow",
    parameters={"username": "demo@example.com"},
    wait_for_completion=True,
)

# Step 2: Create a profile from the run
profile = await client.create_browser_profile(
    name="demo-account-login",
    workflow_run_id=run.run_id,
)

# Step 3: Use the profile in future runs (skip login)
result = await client.run_workflow(
    workflow_id="wpid_extract_invoices",
    browser_profile_id=profile.browser_profile_id,
    wait_for_completion=True,
)

Credentials

Store login information securely. Reference by ID instead of passing secrets in code.

create_credential

credential = await client.create_credential(
    name: str,
    credential_type: CredentialType,       # e.g. "password"
    credential: dict,                      # Password: {"username": "...", "password": "..."}
) -> CredentialResponse

get_credentials

creds = await client.get_credentials(
    page: int | None = None,
    page_size: int | None = None,
) -> list[CredentialResponse]

get_credential

cred = await client.get_credential(credential_id: str) -> CredentialResponse

delete_credential

await client.delete_credential(credential_id: str)

send_totp_code

Send a TOTP code to Skyvern during a run that requires 2FA.
await client.send_totp_code(
    totp_identifier: str,
    content: str,               # The TOTP code value.
    task_id: str | None = None,
    workflow_id: str | None = None,
    workflow_run_id: str | None = None,
    source: str | None = None,
    expired_at: datetime | None = None,
) -> TotpCode

Helper methods

login

Automate logging into a website using stored credentials.
from skyvern.schemas.run_blocks import CredentialType

result = await client.login(
    credential_type: CredentialType,        # Required. skyvern, bitwarden, onepassword, azure_vault.
    url: str | None = None,
    credential_id: str | None = None,       # When using CredentialType.skyvern.
    prompt: str | None = None,
    browser_session_id: str | None = None,
    wait_for_completion: bool = False,
    timeout: float = 1800,
    # Bitwarden: bitwarden_collection_id, bitwarden_item_id
    # 1Password: onepassword_vault_id, onepassword_item_id
    # Azure: azure_vault_name, azure_vault_username_key, azure_vault_password_key, azure_vault_totp_secret_key
) -> WorkflowRunResponse

download_files

Does not support wait_for_completion. Returns immediately — poll with get_run().
result = await client.download_files(
    navigation_goal: str,                   # Required. What to download.
    url: str | None = None,
    browser_session_id: str | None = None,
    browser_profile_id: str | None = None,
    download_suffix: str | None = None,     # Expected extension, e.g. ".pdf"
    download_timeout: float | None = None,
    max_steps_per_run: int | None = None,
) -> WorkflowRunResponse

upload_file

with open("data.csv", "rb") as f:
    upload = await client.upload_file(file=f)
    print(upload.s3uri)          # s3://skyvern-uploads/...
    print(upload.presigned_url)  # https://...signed download URL
Returns UploadFileResponse with fields: s3uri, presigned_url.

Error handling

from skyvern.client.core import ApiError
from skyvern.client.errors import NotFoundError

try:
    run = await client.get_run("tsk_nonexistent")
except NotFoundError as e:
    print(e.status_code, e.body)    # 404
except ApiError as e:
    print(e.status_code, e.body)    # Any other HTTP error
Error types: BadRequestError (400), ForbiddenError (403), NotFoundError (404), ConflictError (409), UnprocessableEntityError (422). All inherit from ApiError. Completion timeout raises Python’s built-in TimeoutError:
try:
    result = await client.run_task(
        prompt="...", url="...", wait_for_completion=True, timeout=300,
    )
except TimeoutError:
    print("Task didn't complete in time")
Run failure is not an exception — check result.status:
if result.status == "failed":
    print(result.failure_reason)
elif result.status == "completed":
    print(result.output)

Request options

Override timeout, retries, or headers per-request:
from skyvern.client.core import RequestOptions

result = await client.run_task(
    prompt="...",
    url="...",
    request_options=RequestOptions(
        timeout_in_seconds=120,
        max_retries=3,
        additional_headers={"x-custom-header": "value"},
    ),
)

Polling pattern

When not using wait_for_completion:
import asyncio

task = await client.run_task(prompt="...", url="...")

while True:
    run = await client.get_run(task.run_id)
    if run.status in ("completed", "failed", "terminated", "timed_out", "canceled"):
        break
    await asyncio.sleep(5)

print(run.output)

Key constraints

  • browser_profile_id works with run_workflow only — silently ignored by run_task.
  • download_files does not support wait_for_completion — poll manually or use webhooks.
  • Only workflow runs with persist_browser_session=True produce archives for profile creation.
  • Session archiving is async — profile creation may need a short retry delay after a run completes.
  • engine accepts RunEngine enum values: skyvern_v1, skyvern_v2, openai_cua, anthropic_cua, ui_tars.