Task Queues are great for deploying resource-intensive functions on Beam. Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.

Task Queues include a built-in retry system. If a task fails for any reason, such as out-of-memory error or an application exception, your app will be retried three times before automatically moving to a failed state.

You can deploy any function to a task queue by using the task_queue decorator:

from beam import App, Volume, Output

app = App(
    name="my-app",
    volumes=[
        Volume(name="files", path="./files"),
    ],
)


@app.task_queue()
def handler():
    return
ParameterRequiredDescription
runtimeNoThe runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None.
outputsNoA list of outputs or output configurations for handling task results. Each element can be an Output instance or a dictionary with output configurations. Default is [].
autoscalingNoAutoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None.
loaderNoThe loader to use for loading the tasks into the task queue. If not specified, the default loader will be used. Default is None.
callback_urlNoA URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None.
max_pending_tasksNoThe maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100.
keep_warm_secondsNoThe duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 0.

Testing your function

Beam includes a live-reloading feature that allows you to run your code on the same environment you’ll be running in production.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

In your shell, run beam serve app.py. This will:

  1. Spin up a container
  2. Run it on a remote server
  3. Print a cURL request to invoke the API
  4. Stream the logs to your shell

You should keep this terminal window open while developing.

(.venv) user@MacBook demo % beam serve app.py
 i  Using cached image.
 ✓  App initialized.
 i  Uploading files...
 ✓  Container scheduled, logs will appear below.
⠴ Starting container... 5s (Estimated: 3m20s)

================= Call the API =================

curl -X POST 'https://apps.beam.cloud/serve/3dpga/650b636542ef2e000aef54fa' \
-H 'Accept: */*' \
-H 'Accept-Encoding: gzip, deflate' \
-H 'Connection: keep-alive' \
-H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json' \
-d '{}'

============= Logs Streamed Below ==============

INFO:     | Starting app...
INFO:     | Loading handler in 'app.py:predict'...
INFO:     | Running loader in 'app.py:load_models'...
INFO:     | Ready for tasks.

Now, head back to your IDE, and change a line of code. Hit save.

If you look closely at the shell running beam serve, you’ll notice the server reloading with your code changes.

You’ll use this workflow anytime you’re developing an app on Beam. Trust us — it makes the development process uniquely fast and painless.

Deploying Task Queues

To deploy the task queue, enter your shell and run this command from the working directory:

beam deploy app.py:your_function

After running this command, you’ll see some logs in the console that show the progress of your deployment.

Creating a Task

Because task queues run asynchronously, the API will return a Task ID.

Request

Request
  curl -X POST --compressed "https://apps.beam.cloud/dzzf3" \
   -H 'Accept: */*' \
   -H 'Accept-Encoding: gzip, deflate' \
   -H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
   -H 'Connection: keep-alive' \
   -H 'Content-Type: application/json' \
   -d '{"prompt": "a renaissance style photo of steve jobs"}'

Response

Response
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }

Further Reading