What Are Task Queues?
Task Queues are great for deploying resource-intensive functions on Beam.
Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.
Creating a Task Queue
You can run any function as a task queue by using the task_queue decorator:
from beam import task_queue, Output
@task_queue(cpu=1.0, memory=128)
def handler():
result = 839 * 18
# Save the result to a text file
file_name = "result.txt"
with open(file_name, "w") as f:
f.write(f"The result is: {result}")
# Upload task result to Beam to retrieve later
Output(path=file_name).save()
You’ll be able to access the result.txt file when the task completes.
Endpoints vs. Task QueuesEndpoints are RESTful APIs, designed for synchronous tasks that can complete in 180 seconds or less. For longer running tasks, you’ll want to use an async task_queue instead.
Sending Async Requests
Because task queues run asynchronously, the API will return a Task ID.
Example Request
curl -X POST "https://9655d778-58c2-4c5d-8c55-03735b63607e.app.beam.cloud" \
-H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json' \
-d '{}'
Example Response
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }
Viewing Task Responses
Because task_queue is async, you will need to make a separate API call to retrieve the task output.
Saving and Returning Output Files
You can save files using Beam’s Output class.
The code below saves a file, wraps it in an Output, and generates a URL that can be retrieved later:
from beam import task_queue, Output
@task_queue(
cpu=1.0,
memory=128,
gpu="A10G",
callback_url="https://webhook.site/9b74f73d-9ec1-4c8e-adcc-07c78aafab6d",
)
def handler():
sum = 839 * 18
# Create a new text file with the result
file_name = "sum.txt"
# Write to new text file
with open(file_name, "w") as f:
f.write(f"The sum is: {sum}")
# Save output
output_file = Output(path=file_name)
# Uploads the file to Beam storage
output_file.save()
Retrieving Results
There are two ways to retrieve response payloads:
- Beam makes a webhook request to your server, based on the
callback_url in your endpoint
- Saving an
Output and calling the /task API
Webhooks
If you’ve added a callback_url to your decorator, Beam will fire a webhook to your server with the task response when it completes:
{
"data": {
"url": "https://app.beam.cloud/output/id/00894876-38df-42c8-a098-879db17e1bf8"
}
}
Polling for Results
Output payloads can be retrieved by polling the task API:
curl -X GET \
'https://api.beam.cloud/v2/task/{TASK_ID}/' \
-H 'Authorization: Bearer [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json'
Your Output will be available in the outputs list in the response:
{
"id": "828a5f6b-0852-44cb-97dc-3c2105b745d3",
"started_at": "2025-05-22T23:19:58.995396Z",
"ended_at": "2025-05-22T23:19:59.061813Z",
"status": "COMPLETE",
"container_id": "taskqueue-2365b036-39df-408f-946f-b25025d1251a-bf09bf62",
"updated_at": "2025-05-22T23:19:59.063168Z",
"created_at": "2025-05-22T23:19:58.950594Z",
"outputs": [
{
"name": "sum.txt",
"url": "https://app.beam.cloud/output/id/c339b459-34de-4f0c-adb9-8be7c20951ce",
"expires_in": 3600
}
],
"stats": {
"active_containers": 1,
"queue_depth": 0
}
}
Retry Behavior
Task Queues include a built-in retry system. If a task fails for any reason,
such as out-of-memory error or an application exception, your task will be
retried three times before automatically moving to a failed state.
Programmatically Enqueuing Tasks
You can interact with the task queue either through an API (when deployed), or directly in Python through the .put() method.
This is useful for queueing tasks programmatically without exposing an
endpoint.
from beam import task_queue, Image
@task_queue(
cpu=1.0,
memory=128,
gpu="T4",
image=Image(python_packages=["torch"]),
keep_warm_seconds=1000,
)
def multiply(x):
result = x * 2
return {"result": result}
# Manually insert task into the queue
multiply.put(x=10)
If invoked directly from your local computer, the code above will produce this output:
$ python app.py
=> Building image
=> Using cached image
=> Syncing files
=> Files synced
Enqueued task: f0d205da-e74b-47ba-b7c3-8e1b9a3c0669