What Are Task Queues?

Task Queues are great for deploying resource-intensive functions on Beam.

Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.

Creating a Task Queue

You can run any function as a task queue by using the task_queue decorator:

from beam import task_queue, Output


@task_queue(cpu=1.0, memory=128)
def handler():
    result = 839 * 18

    # Save the result to a text file
    file_name = "result.txt"
    with open(file_name, "w") as f:
        f.write(f"The result is: {result}")

    # Upload task result to Beam to retrieve later
    Output(path=file_name).save()

You’ll be able to access the result.txt file when the task completes.

Endpoints vs. Task Queues

Endpoints are RESTful APIs, designed for synchronous tasks that can complete in 180 seconds or less. For longer running tasks, you’ll want to use an async task_queue instead.

Sending Async Requests

Because task queues run asynchronously, the API will return a Task ID.

Example Request

Request
  curl -X POST "https://9655d778-58c2-4c5d-8c55-03735b63607e.app.beam.cloud" \
   -H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
   -H 'Content-Type: application/json' \
   -d '{}'

Example Response

Response
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }

Viewing Task Responses

Because task_queue is async, you will need to make a separate API call to retrieve the task output.

Saving and Returning Output Files

You can save files using Beam’s Output class.

The code below saves a file, wraps it in an Output, and generates a URL that can be retrieved later:

app.py
from beam import task_queue, Output


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="A10G",
    callback_url="https://webhook.site/9b74f73d-9ec1-4c8e-adcc-07c78aafab6d",
)
def handler():
    sum = 839 * 18

    # Create a new text file with the result
    file_name = "sum.txt"

    # Write to new text file
    with open(file_name, "w") as f:
        f.write(f"The sum is: {sum}")

    # Save output
    output_file = Output(path=file_name)
    # Uploads the file to Beam storage
    output_file.save()

Retrieving Results

There are two ways to retrieve response payloads:

  1. Beam makes a webhook request to your server, based on the callback_url in your endpoint
  2. Saving an Output and calling the /task API

Webhooks

If you’ve added a callback_url to your decorator, Beam will fire a webhook to your server with the task response when it completes:

{
  "data": {
    "url": "https://app.beam.cloud/output/id/00894876-38df-42c8-a098-879db17e1bf8"
  }
}

For testing purposes, you can setup a temporary webhook URL using https://webhook.site

Polling for Results

Output payloads can be retrieved by polling the task API:

curl -X GET \
  'https://api.beam.cloud/v2/task/{TASK_ID}/' \
  -H 'Authorization: Bearer [YOUR_AUTH_TOKEN]' \
  -H 'Content-Type: application/json'

Your Output will be available in the outputs list in the response:

{
  "id": "828a5f6b-0852-44cb-97dc-3c2105b745d3",
  "started_at": "2025-05-22T23:19:58.995396Z",
  "ended_at": "2025-05-22T23:19:59.061813Z",
  "status": "COMPLETE",
  "container_id": "taskqueue-2365b036-39df-408f-946f-b25025d1251a-bf09bf62",
  "updated_at": "2025-05-22T23:19:59.063168Z",
  "created_at": "2025-05-22T23:19:58.950594Z",
  "outputs": [
    {
      "name": "sum.txt",
      "url": "https://app.beam.cloud/output/id/c339b459-34de-4f0c-adb9-8be7c20951ce",
      "expires_in": 3600
    }
  ],
  "stats": {
    "active_containers": 1,
    "queue_depth": 0
  }
}

Retry Behavior

Task Queues include a built-in retry system. If a task fails for any reason, such as out-of-memory error or an application exception, your task will be retried three times before automatically moving to a failed state.

Programmatically Enqueuing Tasks

You can interact with the task queue either through an API (when deployed), or directly in Python through the .put() method.

This is useful for queueing tasks programmatically without exposing an endpoint.

app.py
from beam import task_queue, Image


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(x):
    result = x * 2
    return {"result": result}

# Manually insert task into the queue
multiply.put(x=10)

If invoked directly from your local computer, the code above will produce this output:

$ python app.py

=> Building image
=> Using cached image
=> Syncing files
=> Files synced

Enqueued task: f0d205da-e74b-47ba-b7c3-8e1b9a3c0669