Task Queues are great for deploying resource-intensive functions on Beam. Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.

Task Queues include a built-in retry system. If a task fails for any reason, such as out-of-memory error or an application exception, your task will be retried three times before automatically moving to a failed state.

You can deploy any function to a task queue by using the task_queue decorator:

from beam import task_queue, Image


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(**inputs):
    result = inputs["x"] * 2
    return {"result": result}

Accessing the queue directly

You can interact with the task queue either through an API (when deployed), or directly in Python through the .put() method.

app.py
from beam import task_queue, Image


@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(x):
    result = x * 2
    return {"result": result}

# Manually insert task into the queue
multiply.put(x=10)

If invoked directly from your local computer, the code above will produce this output:

$ python app.py

=> Building image
=> Using cached image
=> Syncing files
=> Files synced

Enqueued task: f0d205da-e74b-47ba-b7c3-8e1b9a3c0669

Testing with an ephemeral web endpoint

Beam includes a live-reloading feature that allows you to run your code on the same environment you’ll be running in production.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

In your shell, run beam serve [FILE.PY]:[FUNCTION]. This will:

  1. Spin up a container
  2. Run it on a remote server
  3. Print a cURL request to invoke the API
  4. Stream the logs to your shell

You should keep this terminal window open while developing.

$ beam serve app.py:multiply

=> Building image
=> Using cached image
=> Syncing files
Reading .beamignore file
=> Files synced

=> Invocation details
curl -X POST 'https://app.beam.cloud/taskqueue/id/e5afd708-faae-48e3-a54f-9113c6722067' \
-H 'Accept: */*' \
-H 'Accept-Encoding: gzip, deflate' \
-H 'Connection: keep-alive' \
-H 'Authorization: Bearer [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json' \
-d '{}'

=> Watching /Users/beta9/beamv2 for changes...

Now, head back to your IDE, and change a line of code. Hit save.

If you look closely at the shell running beam serve, you’ll notice the server reloading with your code changes.

You’ll use this workflow anytime you’re developing an app on Beam. Trust us — it makes the development process uniquely fast and painless.

Deploying a task queue endpoint

To deploy the task queue, enter your shell and run this command from the working directory:

beam deploy [FILE.PY]:[ENTRY-POINT] --name [NAME]

After running this command, you’ll see some logs in the console that show the progress of your deployment.

Calling the deployed API

Because task queues run asynchronously, the API will return a Task ID.

Request

Request
  curl -X POST --compressed "https://apps.beam.cloud/dzzf3" \
   -H 'Accept: */*' \
   -H 'Accept-Encoding: gzip, deflate' \
   -H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
   -H 'Connection: keep-alive' \
   -H 'Content-Type: application/json' \
   -d '{"prompt": "a renaissance style photo of steve jobs"}'

Response

Response
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }