Skip to main content
Workflows often need to handle files: downloading invoices, parsing spreadsheets, uploading documents to storage. This page covers the four core file operations: downloading, parsing, saving, and passing files between blocks.
This page uses workflow template syntax ({{ parameter }}, {{ label_output }}) and assumes familiarity with blocks. See Build a Workflow for an introduction.

Downloading files

When your workflow needs to retrieve documents from a website — invoices from a vendor portal, confirmation letters after form submission, compliance certificates — use file_download. Downloaded files land in SKYVERN_DOWNLOAD_DIRECTORY, a temporary directory created for each workflow run. Every file downloaded during a run accumulates here, and you can reference this directory in later blocks to upload or email the files.

Downloading a single file

Say you’ve just filed an SS-4 form on the IRS website and need to grab the EIN confirmation letter. The file_download block navigates the page and triggers the download:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Download EIN Letter",
            "workflow_definition": {
                "parameters": [],
                "blocks": [
                    {
                        "block_type": "file_download",
                        "label": "download_ein_letter",
                        "url": "https://sa.www4.irs.gov/modiein/individual/index.jsp",
                        "navigation_goal": (
                            "Find and download the EIN confirmation letter PDF. "
                            "Look for a Download or Print button. "
                            "COMPLETE when the download starts."
                        ),
                        "download_suffix": "ein_confirmation.pdf"
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow
    run = await client.run_workflow(workflow_id=workflow.workflow_permanent_id)
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
Three fields drive this block:
  • navigation_goal tells Skyvern what to do on the page — find the download button, click it, and mark the task complete when the download starts. Write this the way you’d explain the task to a colleague looking at the screen.
  • download_suffix names the downloaded file. Without it, you’d get a generic filename. Use a descriptive suffix so you can identify the file later when you upload or email it.
  • url sets the starting page. If your previous block already navigated to the right page, omit url and the download block continues from there.
ParameterTypeUse this to
navigation_goalstringDescribe how to find and click the download button
urlstringSet the starting page (omit to continue from previous block)
download_suffixstringSet the filename for the downloaded file
download_timeoutnumberAllow more time for large files (in seconds)
parameter_keysarrayList workflow parameters this block can access
max_retriesintegerRetry the block on failure
max_steps_per_runintegerLimit how many actions the block can take
enginestringSet the AI engine (skyvern-1.0 or skyvern-2.0)
For the full parameter list, see File Download in the blocks reference.

Downloading multiple files

When you need to download a batch of files — say, all monthly invoices from a vendor portal — pair a for_loop with file_download. A prior block (like an extraction) returns a list of URLs, and the loop downloads each one. Here, get_invoices is an extraction block that scrapes invoice URLs from the portal. The loop iterates over its output and downloads each one:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Download All Invoices",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "portal_url",
                        "parameter_type": "workflow",
                        "workflow_parameter_type": "string"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "extraction",
                        "label": "get_invoices",
                        "url": "{{ portal_url }}",
                        "data_extraction_goal": "Extract all invoice download URLs from this page"
                    },
                    {
                        "block_type": "for_loop",
                        "label": "download_all_invoices",
                        "loop_over_parameter_key": "get_invoices_output",
                        "continue_on_failure": True,
                        "loop_blocks": [
                            {
                                "block_type": "file_download",
                                "label": "download_invoice",
                                "url": "{{ download_invoice.current_value }}",
                                "navigation_goal": "Download the invoice PDF from this page"
                            }
                        ]
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow with parameters
    run = await client.run_workflow(
        workflow_id=workflow.workflow_permanent_id,
        parameters={"portal_url": "https://vendor.example.com/invoices"}
    )
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
  • loop_over_parameter_key points to the output of the prior extraction block (get_invoices_output).
  • {{ download_invoice.current_value }} gives you one URL per iteration — the inner block’s label (download_invoice) followed by .current_value.
  • continue_on_failure: true on the loop means a single failed download won’t stop the rest. All successful downloads still land in SKYVERN_DOWNLOAD_DIRECTORY.
If a download fails, the block fails. Set continue_on_failure: true on the loop (as shown above) so a single failed download doesn’t stop the entire workflow.

Parsing files

When your workflow needs to act on information inside a file — filling forms with resume data, processing orders from a spreadsheet, extracting fields from a PDF — use file_url_parser. It downloads the file, parses it, and returns structured data you can use in subsequent blocks.

Parsing a PDF with a schema

Say you’re building a workflow that applies to jobs on Lever. You have a candidate’s resume as a PDF URL and need to extract their name, email, and work history so a later navigation block can fill the application form. Define the fields you want in a json_schema, and Skyvern uses an LLM to extract them:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Parse Resume",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "resume",
                        "parameter_type": "workflow",
                        "workflow_parameter_type": "file_url",
                        "description": "URL to the resume PDF"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "file_url_parser",
                        "label": "parse_resume",
                        "file_url": "{{ resume }}",
                        "file_type": "pdf",
                        "json_schema": {
                            "type": "object",
                            "properties": {
                                "name": {"type": "string"},
                                "email": {"type": "string"},
                                "work_experience": {
                                    "type": "array",
                                    "items": {
                                        "type": "object",
                                        "properties": {
                                            "company": {"type": "string"},
                                            "role": {"type": "string"}
                                        }
                                    }
                                }
                            }
                        }
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow with a resume URL
    run = await client.run_workflow(
        workflow_id=workflow.workflow_permanent_id,
        parameters={
            "resume": "https://writing.colostate.edu/guides/documents/resume/functionalsample.pdf"
        }
    )
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
  • file_url points to the file. Here, {{ resume }} is a workflow parameter passed at runtime — it could be a public URL, an S3 presigned URL, or a Google Drive link.
  • file_type tells Skyvern how to read the file. Use csv, excel, pdf, image, or docx.
  • json_schema defines exactly what to extract. The LLM reads the PDF and returns data matching this structure. The work_experience array means you’ll get one object per job with company and role fields.
The output is available as {{ parse_resume_output }} in subsequent blocks and looks like this:
{
  "name": "Jane Doe",
  "email": "jane@example.com",
  "work_experience": [
    { "company": "Acme Corp", "role": "Software Engineer" },
    { "company": "Globex Inc", "role": "Tech Lead" }
  ]
}
Without a json_schema, you get the raw parsed content instead — plain text for PDFs, or an array of row objects for CSVs. The schema is what turns unstructured file content into typed, structured data.
ParameterTypeUse this to
file_urlstringPoint to the file (HTTP URL, S3 URI, or parameter like {{ resume }})
file_typestringSpecify format: csv, excel, pdf, image, or docx
json_schemaobjectDefine the structure you want extracted
Supported sources for file_url:
  • HTTP/HTTPS URLs
  • S3 URIs (s3://bucket/path/file.csv)
  • Azure Blob URIs (azure://container/path/file.xlsx)
  • Google Drive links (auto-converted)
  • Workflow parameters ({{ parameter_name }})

Parsing a CSV and processing each row

Say your procurement team exports purchase orders as a CSV and you need to enter each one into your web-based ERP. First parse the CSV, then loop over the rows:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Import Purchase Orders",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "orders_csv",
                        "parameter_type": "workflow",
                        "workflow_parameter_type": "file_url"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "file_url_parser",
                        "label": "parse_orders",
                        "file_url": "{{ orders_csv }}",
                        "file_type": "csv"
                    },
                    {
                        "block_type": "for_loop",
                        "label": "process_orders",
                        "loop_over_parameter_key": "parse_orders_output",
                        "loop_blocks": [
                            {
                                "block_type": "navigation",
                                "label": "enter_order",
                                "url": "https://erp.example.com/orders/new",
                                "navigation_goal": "Create a new order with: {{ enter_order.current_value }}"
                            }
                        ]
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow with a CSV URL
    run = await client.run_workflow(
        workflow_id=workflow.workflow_permanent_id,
        parameters={
            "orders_csv": "https://example.com/purchase_orders.csv"
        }
    )
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
CSV parsing returns an array of objects — one per row, with column headers as keys. No json_schema needed for CSVs since the column headers already provide structure:
[
  { "order_id": "ORD-001", "product": "Widget", "quantity": "10" },
  { "order_id": "ORD-002", "product": "Gadget", "quantity": "5" }
]
The for_loop iterates over this array. On each iteration, {{ enter_order.current_value }} contains one row object (e.g., { "order_id": "ORD-001", "product": "Widget", "quantity": "10" }), which the navigation block uses to fill the ERP form.

Saving files

When your workflow needs to store downloaded files permanently — archiving invoices, sending reports to a shared bucket, integrating with other systems — use one of the upload blocks.
BlockWhen to use it
upload_to_s3Store files in Skyvern’s managed S3. Simplest option — no credentials needed.
file_uploadStore files in your own S3 bucket or Azure Blob Storage. Use when you need files in your infrastructure.
download_to_s3Save a file from a URL directly to Skyvern’s S3, skipping the browser. Use when you already have the file URL.

Upload to Skyvern’s S3

After downloading EIN confirmation letters from the IRS, archive them to Skyvern’s managed S3. Pass SKYVERN_DOWNLOAD_DIRECTORY as the path and the block uploads every file downloaded during this run:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Archive Downloads to S3",
            "workflow_definition": {
                "parameters": [],
                "blocks": [
                    {
                        "block_type": "upload_to_s3",
                        "label": "save_invoices",
                        "path": "SKYVERN_DOWNLOAD_DIRECTORY"
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow
    run = await client.run_workflow(workflow_id=workflow.workflow_permanent_id)
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
The output ({{ save_invoices_output }}) is a list of S3 URIs you can return from your workflow or pass to subsequent blocks.
ParameterTypeUse this to
pathstringSpecify what to upload (SKYVERN_DOWNLOAD_DIRECTORY or a specific file path)
Limit: 50 files maximum per upload.

Upload to your own storage

When compliance certificates need to land in your company’s own S3 bucket (not Skyvern’s), use file_upload with your AWS credentials:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow with AWS Secrets Manager-backed credentials
    workflow = await client.create_workflow(
        json_definition={
            "title": "Upload to Company S3",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "aws_key",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/aws/access_key_id"
                    },
                    {
                        "key": "aws_secret",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/aws/secret_access_key"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "file_upload",
                        "label": "upload_to_company_bucket",
                        "storage_type": "s3",
                        "s3_bucket": "company-documents",
                        "aws_access_key_id": "{{ aws_key }}",
                        "aws_secret_access_key": "{{ aws_secret }}",
                        "region_name": "us-west-2",
                        "path": "SKYVERN_DOWNLOAD_DIRECTORY"
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow (credentials auto-resolve from AWS Secrets Manager)
    run = await client.run_workflow(workflow_id=workflow.workflow_permanent_id)
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
  • storage_type selects the destination: s3 for AWS, azure for Azure Blob Storage.
  • Credentials should use aws_secret parameters backed by AWS Secrets Manager — credentials auto-resolve at runtime without being passed in the run payload.
  • path works the same as upload_to_s3 — pass SKYVERN_DOWNLOAD_DIRECTORY to upload everything, or a specific file path.
ParameterTypeUse this to
storage_typestringChoose s3 or azure
pathstringSpecify what to upload (SKYVERN_DOWNLOAD_DIRECTORY or a file path)
S3 parameters: s3_bucket, aws_access_key_id, aws_secret_access_key, region_name Azure parameters: azure_storage_account_name, azure_storage_account_key, azure_blob_container_name, azure_folder_path Limit: 50 files maximum per upload.

Quick URL-to-S3

When you already have a direct URL to a file — say, a Google Analytics export link — and just need to save it without opening a browser, use download_to_s3:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Save Report to S3",
            "workflow_definition": {
                "parameters": [],
                "blocks": [
                    {
                        "block_type": "download_to_s3",
                        "label": "save_report",
                        "url": "https://analytics.example.com/report.pdf"
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow
    run = await client.run_workflow(workflow_id=workflow.workflow_permanent_id)
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
ParameterTypeUse this to
urlstringSpecify the file URL to download and save
This skips the browser entirely — it fetches the file directly and stores it in Skyvern’s S3. Use it when you already have the URL and don’t need Skyvern to navigate a page. Limit: 10 MB maximum file size.

Passing files between blocks

Files flow through workflows in two ways: through SKYVERN_DOWNLOAD_DIRECTORY for downloaded files, and through output parameters for parsed data.

Using downloaded files

Say you downloaded invoices from a vendor portal earlier in the workflow and now need to email them to your accounting team. Any file downloaded during the run sits in SKYVERN_DOWNLOAD_DIRECTORY, and you can attach the entire directory to an email:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Email Downloaded Invoices",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "smtp_host",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/smtp/host"
                    },
                    {
                        "key": "smtp_port",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/smtp/port"
                    },
                    {
                        "key": "smtp_username",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/smtp/username"
                    },
                    {
                        "key": "smtp_password",
                        "parameter_type": "aws_secret",
                        "aws_key": "skyvern/smtp/password"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "send_email",
                        "label": "email_documents",
                        "sender": "automation@company.com",
                        "recipients": ["accounting@company.com"],
                        "subject": "Monthly invoices attached",
                        "body": "See attached invoices for this month.",
                        "smtp_host_secret_parameter_key": "smtp_host",
                        "smtp_port_secret_parameter_key": "smtp_port",
                        "smtp_username_secret_parameter_key": "smtp_username",
                        "smtp_password_secret_parameter_key": "smtp_password",
                        "file_attachments": ["SKYVERN_DOWNLOAD_DIRECTORY"]
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow (SMTP secrets are auto-resolved from AWS Secrets Manager)
    run = await client.run_workflow(workflow_id=workflow.workflow_permanent_id)
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
file_attachments: ["SKYVERN_DOWNLOAD_DIRECTORY"] attaches every file downloaded during the run. The SMTP credentials use aws_secret parameters — these reference keys stored in AWS Secrets Manager so your credentials are never exposed in the workflow definition.

Using parsed data

When you parse a file, the extracted data becomes available as {{ label_output }}. This is how you chain a parse step with a navigation step — for example, parsing a resume PDF and then using the candidate’s info to fill a Lever job application:
import os
import asyncio
from skyvern import Skyvern

async def main():
    client = Skyvern(api_key=os.getenv("SKYVERN_API_KEY"))

    # Create the workflow
    workflow = await client.create_workflow(
        json_definition={
            "title": "Parse Resume and Apply",
            "workflow_definition": {
                "parameters": [
                    {
                        "key": "resume",
                        "parameter_type": "workflow",
                        "workflow_parameter_type": "file_url"
                    },
                    {
                        "key": "job_url",
                        "parameter_type": "workflow",
                        "workflow_parameter_type": "string"
                    }
                ],
                "blocks": [
                    {
                        "block_type": "file_url_parser",
                        "label": "parse_resume",
                        "file_url": "{{ resume }}",
                        "file_type": "pdf"
                    },
                    {
                        "block_type": "navigation",
                        "label": "fill_application",
                        "url": "{{ job_url }}",
                        "navigation_goal": (
                            "Fill out the application form using "
                            "this candidate's information:\n\n"
                            "{{ parse_resume_output }}"
                        )
                    }
                ]
            }
        }
    )
    print(f"Workflow ID: {workflow.workflow_permanent_id}")

    # Run the workflow with a resume and job URL
    run = await client.run_workflow(
        workflow_id=workflow.workflow_permanent_id,
        parameters={
            "resume": "https://writing.colostate.edu/guides/documents/resume/functionalsample.pdf",
            "job_url": "https://jobs.lever.co/company/position"
        }
    )
    print(f"Run ID: {run.run_id}")

    # Poll until complete
    while True:
        result = await client.get_run(run.run_id)
        if result.status in ["completed", "failed", "terminated", "timed_out", "canceled"]:
            break
        print(f"Status: {result.status}")
        await asyncio.sleep(5)

    print(f"Final status: {result.status}")
    print(f"Output: {result.output}")

asyncio.run(main())
The first block parses the resume and stores the result. The second block references {{ parse_resume_output }} in its navigation_goal — Skyvern replaces this with the full parsed data (name, email, work experience) and uses it to fill the form fields.

Accepting files as workflow inputs

To accept a file when the workflow runs, define a parameter with workflow_parameter_type: file_url:
workflow = await client.create_workflow(
    json_definition={
        "title": "Resume Parser",
        "workflow_definition": {
            "parameters": [
                {
                    "key": "resume",
                    "parameter_type": "workflow",
                    "workflow_parameter_type": "file_url",
                    "description": "URL to the applicant's resume"
                }
            ],
            "blocks": [...]
        }
    }
)
When running the workflow, pass any accessible URL: public links, S3 presigned URLs, or files in your own storage.

Next steps

Workflow Blocks Reference

Complete parameter reference for all file blocks

Build a Workflow

Learn how blocks pass data to each other