> ## Documentation Index
> Fetch the complete documentation index at: https://docs.beam.cloud/llms.txt
> Use this file to discover all available pages before exploring further.

# Running Async Tasks

### What Are Task Queues?

Task Queues are great for deploying resource-intensive functions on Beam.

Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.

### Creating a Task Queue

You can run any function as a task queue by using the `task_queue` decorator:

```python theme={null}
from beam import task_queue, Output


@task_queue(cpu=1.0, memory=128)
def handler():
    result = 839 * 18

    # Save the result to a text file
    file_name = "result.txt"
    with open(file_name, "w") as f:
        f.write(f"The result is: {result}")

    # Upload task result to Beam to retrieve later
    Output(path=file_name).save()
```

You’ll be able to access the `result.txt` file when the task completes.

<Tip>
  **Endpoints vs. Task Queues**

  Endpoints are RESTful APIs, designed for synchronous tasks that can complete in 180 seconds or less. For longer running tasks, you'll want to use an async [`task_queue`](/v2/task-queue/running-tasks) instead.
</Tip>

### Sending Async Requests

Because task queues run asynchronously, the API will return a Task ID.

**Example Request**

```bash Request theme={null}
  curl -X POST "https://9655d778-58c2-4c5d-8c55-03735b63607e.app.beam.cloud" \
   -H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
   -H 'Content-Type: application/json' \
   -d '{}'
```

**Example Response**

```bash Response theme={null}
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }
```

### Viewing Task Responses

Because `task_queue` is async, you will need to make a separate API call to retrieve the task output.

### Saving and Returning Output Files

You can save files using Beam's [Output](/v2/reference/py-sdk#output) class.

The code below saves a file, wraps it in an `Output`, and generates a URL that can be retrieved later:

```python app.py theme={null}
from beam import task_queue, Output


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="A10G",
    callback_url="https://webhook.site/9b74f73d-9ec1-4c8e-adcc-07c78aafab6d",
)
def handler():
    sum = 839 * 18

    # Create a new text file with the result
    file_name = "sum.txt"

    # Write to new text file
    with open(file_name, "w") as f:
        f.write(f"The sum is: {sum}")

    # Save output
    output_file = Output(path=file_name)
    # Uploads the file to Beam storage
    output_file.save()
```

### Retrieving Results

There are two ways to retrieve response payloads:

1. Beam makes a webhook request to your server, based on the [`callback_url`](/v2/topics/callbacks) in your endpoint
2. Saving an `Output` and calling the `/task` API

#### Webhooks

If you've added a [`callback_url`](/v2/topics/callbacks) to your decorator, Beam will fire a webhook to your server with the task response when it completes:

```json theme={null}
{
  "data": {
    "url": "https://app.beam.cloud/output/id/00894876-38df-42c8-a098-879db17e1bf8"
  }
}
```

<Tip>
  For testing purposes, you can setup a temporary webhook URL using
  [https://webhook.site](https://webhook.site)
</Tip>

#### Polling for Results

`Output` payloads can be retrieved by polling the `task` API:

```bash theme={null}
curl -X GET \
  'https://api.beam.cloud/v2/task/{TASK_ID}/' \
  -H 'Authorization: Bearer [YOUR_AUTH_TOKEN]' \
  -H 'Content-Type: application/json'
```

Your Output will be available in the `outputs` list in the response:

```json theme={null}
{
  "id": "828a5f6b-0852-44cb-97dc-3c2105b745d3",
  "started_at": "2025-05-22T23:19:58.995396Z",
  "ended_at": "2025-05-22T23:19:59.061813Z",
  "status": "COMPLETE",
  "container_id": "taskqueue-2365b036-39df-408f-946f-b25025d1251a-bf09bf62",
  "updated_at": "2025-05-22T23:19:59.063168Z",
  "created_at": "2025-05-22T23:19:58.950594Z",
  "outputs": [
    {
      "name": "sum.txt",
      "url": "https://app.beam.cloud/output/id/c339b459-34de-4f0c-adb9-8be7c20951ce",
      "expires_in": 3600
    }
  ],
  "stats": {
    "active_containers": 1,
    "queue_depth": 0
  }
}
```

### Retry Behavior

Task Queues include a built-in retry system. If a task fails for any reason,
such as out-of-memory error or an application exception, your task will be
retried three times before automatically moving to a failed state.

### Programmatically Enqueuing Tasks

You can interact with the task queue either through an API (when deployed), or directly in Python through the `.put()` method.

<Tip>
  This is useful for queueing tasks programmatically without exposing an
  endpoint.
</Tip>

```python app.py theme={null}
from beam import task_queue, Image


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(x):
    result = x * 2
    return {"result": result}

# Manually insert task into the queue
multiply.put(x=10)
```

If invoked directly from your local computer, the code above will produce this output:

```
$ python app.py

=> Building image
=> Using cached image
=> Syncing files
=> Files synced

Enqueued task: f0d205da-e74b-47ba-b7c3-8e1b9a3c0669
```
