Task Queue
Task Queues are great for deploying resource-intensive functions on Beam. Instead of processing tasks immediately, the task queue enables you to add tasks to a queue and process them later, either sequentially or concurrently.
Task Queues include a built-in retry system. If a task fails for any reason, such as out-of-memory error or an application exception, your app will be retried three times before automatically moving to a failed state.
You can deploy any function to a task queue by using the task_queue
decorator:
from beam import App, Volume, Output
app = App(
name="my-app",
volumes=[
Volume(name="files", path="./files"),
],
)
@app.task_queue()
def handler():
return
Parameter | Required | Description |
---|---|---|
runtime | No | The runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None. |
outputs | No | A list of outputs or output configurations for handling task results. Each element can be an Output instance or a dictionary with output configurations. Default is []. |
autoscaling | No | Autoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None. |
loader | No | The loader to use for loading the tasks into the task queue. If not specified, the default loader will be used. Default is None. |
callback_url | No | A URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None. |
max_pending_tasks | No | The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100. |
keep_warm_seconds | No | The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 0. |
Testing your function
Beam includes a live-reloading feature that allows you to run your code on the same environment you’ll be running in production.
By default, Beam will sync all the files in your working directory to the
remote container. This allows you to use the files you have locally while
developing. If you want to prevent some files from getting uploaded, you can
create a .beamignore
.
In your shell, run beam serve app.py
. This will:
- Spin up a container
- Run it on a remote server
- Print a cURL request to invoke the API
- Stream the logs to your shell
You should keep this terminal window open while developing.
(.venv) user@MacBook demo % beam serve app.py
i Using cached image.
✓ App initialized.
i Uploading files...
✓ Container scheduled, logs will appear below.
⠴ Starting container... 5s (Estimated: 3m20s)
================= Call the API =================
curl -X POST 'https://apps.beam.cloud/serve/3dpga/650b636542ef2e000aef54fa' \
-H 'Accept: */*' \
-H 'Accept-Encoding: gzip, deflate' \
-H 'Connection: keep-alive' \
-H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json' \
-d '{}'
============= Logs Streamed Below ==============
INFO: | Starting app...
INFO: | Loading handler in 'app.py:predict'...
INFO: | Running loader in 'app.py:load_models'...
INFO: | Ready for tasks.
Now, head back to your IDE, and change a line of code. Hit save.
If you look closely at the shell running beam serve
, you’ll notice the server reloading with your code changes.
You’ll use this workflow anytime you’re developing an app on Beam. Trust us — it makes the development process uniquely fast and painless.
Deploying Task Queues
To deploy the task queue, enter your shell and run this command from the working directory:
beam deploy app.py:your_function
After running this command, you’ll see some logs in the console that show the progress of your deployment.
Creating a Task
Because task queues run asynchronously, the API will return a Task ID.
Request
curl -X POST --compressed "https://apps.beam.cloud/dzzf3" \
-H 'Accept: */*' \
-H 'Accept-Encoding: gzip, deflate' \
-H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json' \
-d '{"prompt": "a renaissance style photo of steve jobs"}'
Response
{ "task_id": "edbcf7ff-e8ce-4199-8661-8e15ed880481" }
Further Reading
Was this page helpful?