SDK Reference
App
Defines a namespace for your functions.
App()
Creates an App instance.
Parameters
Parameter | Required | Description |
---|---|---|
name | Yes | The unique identifier for the app. |
volumes | No | A list of storage volumes to be associated with the app. Default is []. |
runtime | No | The runtime environment for the app execution. Defines the container environment and hardware configuration the app will run on. If not specified, a runtime will need to be passed inline to any function triggers attached to this app. Default is None. |
Examples
Running functions in an app
First, instantiate an App()
object. You can then decorate any function with the app
object and a trigger type, for example @app.run()
:
from beam import App, Runtime, Image, Output, Volume
app = App(
name="my_app",
runtime=Runtime(
cpu=1,
gpu="A10G",
image=Image(
python_version="python3.8",
python_packages=["torch"],
commands=["apt-get install ffmpeg"],
),
),
volumes=[Volume(name="my_models", path="models")],
)
@app.run(outputs=[Output(path="./test.txt")])
def some_function():
return
App.asgi()
Decorator used for deploying an arbitrary ASGI-compatible application.
ASGI deployments are coming soon. Email us at founders@beam.cloud to learn more.
Parameters
Parameter | Required | Description |
---|---|---|
runtime | No | The runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None. |
autoscaler | No | Defines an autoscaling strategy for your workload. See RequestLatencyAutoscaler. |
loader | No | A function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None. |
max_pending_tasks | No | The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100. |
keep_warm_seconds | No | The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 0. |
task_policy | No | The task policy object where you can specify max retries and timeout |
timeout | No | The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout) |
workers | No | The number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1. |
authorized | No | Enable global basic authentication for all endpoints in your app. Default is True. |
Examples
from beam import App, Runtime, Image, Volume
from fastapi import FastAPI
app = App(
name="vocalize",
runtime=Runtime(
cpu=1,
gpu="A10G",
image=Image(
python_version="python3.8",
python_packages=["torch"],
commands=["apt-get install ffmpeg"],
),
),
volumes=[Volume(name="my_models", path="models")],
)
@app.asgi(keep_warm_seconds=1000, workers=2)
def transcribe():
my_app = FastAPI()
@my_app.get("/something")
def func():
return {}
@my_app.post("/something-else")
def func():
return {}
return my_app
App.rest_api()
Decorator used for deploying a RESTful API.
This method sets up a RESTful API endpoint that can receive task submissions for processing.
When tasks are submitted to this endpoint, they are added to a task queue and processed
synchronously. If the task takes longer than 90 seconds to complete, it will continue to run
asynchronously and the results can be retrieved by firing a callback using callback_url
or retrieving the results manually calling api.beam.cloud/v1/task/{task_id}/status/
.
Parameters
Parameter | Required | Description |
---|---|---|
runtime | No | The runtime environment to execute the tasks submitted via the API. It can be a dictionary containing runtime configurations or an instance of the Runtime class. Default is None. |
outputs | No | A list of outputs or output configurations for handling task results. Default is []. |
autoscaling | No | [DEPRECATED] Autoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None. |
autoscaler | No | Defines an autoscaling strategy for your workload. See RequestLatencyAutoscaler. |
loader | No | A function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None. |
callback_url | No | A URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None. |
max_pending_tasks | No | The maximum number of tasks that can be pending in the queue before rejecting new submissions to the API. Default is 100. |
keep_warm_seconds | No | The duration in seconds to keep the container warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive via the API. Default is 100. |
task_policy | No | The task policy object where you can specify max retries and timeout |
timeout | No | The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout) |
max_retries | No | The maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries) |
workers | No | The number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1. |
Examples
from beam import App, Runtime, Image, Volume
app = App(
name="vocalize",
runtime=Runtime(
cpu=1,
gpu="A10G",
image=Image(
python_version="python3.8",
python_packages=["torch"],
commands=["apt-get install ffmpeg"],
),
),
volumes=[Volume(name="my_models", path="models")],
)
@app.rest_api(keep_warm_seconds=1000)
def transcribe():
return
App.run()
Decorator used for running code immediately.
This method is used to define the configuration for executing your code on Beam, without scheduling it or deploying it as an API.
Parameters
Parameter | Required | Description |
---|---|---|
runtime | No | The runtime environment for the task execution. Default is None. |
outputs | No | A list of of artifacts created during the task execution. Default is []. |
callback_url | No | A URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None. |
task_policy | No | The task policy object where you can specify max retries and timeout |
timeout | No | The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout) |
Examples
Defining a run in your project
from beam import App, Runtime
app = App(name="my-app", runtime=Runtime())
@app.run()
def handler():
...
Starting a run with the CLI
When you run the beam run
command, this decorator function will be run asynchronously,
and the results can be obtained through the /task
API or web dashboard.
When starting a run, you can pass data to your function using the -d
argument:
beam run my-file:func -d '{"text": "some-text"}'
App.schedule()
Decorator used for scheduling a task.
This method is used to add configuration for scheduling a task for future execution at a specified time or interval. The task will be added to a scheduler, and when the scheduled time or interval is reached, the task will be executed asynchronously.
Parameters
Parameter | Required | Description |
---|---|---|
when | Yes | The scheduling time or interval for the task execution. It can be a cron or string expressions. You can use Crontab to help generate expressions. |
runtime | No | The runtime environment to execute the scheduled task. Default is None. |
outputs | No | A list of outputs for handling the results of the scheduled task. Default is []. |
callback_url | No | A URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None. |
task_policy | No | The task policy object where you can specify max retries and timeout |
timeout | No | The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout) |
max_retries | No | The maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries) |
Examples
# Runs every 5 minutes
@app.schedule(when="every 5m")
# Runs every 1 hour
@app.schedule(when="every 1h")
# Runs every day at midnight
@app.schedule(when="0 0 * * *")
App.task_queue()
Decorator used for deploying a task queue.
This method allows you to add tasks to a task queue for processing. The tasks are executed
asynchronously, and the results can be obtained later, through firing a callback using
callback_url
, or retrieving the results manually by calling
api.beam.cloud/v1/task/{task_id}/status/
.
Parameters
Parameter | Required | Description |
---|---|---|
runtime | No | The runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None. |
outputs | No | A list of outputs or output configurations for handling task results. Each element can be an Output instance or a dictionary with output configurations. Default is []. |
autoscaling | No | [DEPRECATED] Autoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None. |
autoscaler | No | Defines an autoscaling strategy for your workload. See RequestLatencyAutoscaler. |
loader | No | A function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None. |
callback_url | No | A URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None. |
max_pending_tasks | No | The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100. |
keep_warm_seconds | No | The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 0. |
task_policy | No | The task policy object where you can specify max retries and timeout |
timeout | No | The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout) |
max_retries | No | The maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries) |
workers | No | The number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1. |
Examples
from beam import App, Runtime, Image, Volume
app = App(
name="vocalize",
runtime=Runtime(
cpu=1,
gpu="A10G",
image=Image(
python_version="python3.8",
python_packages=["torch"],
commands=["apt-get install ffmpeg"],
),
),
volumes=[Volume(name="my_models", path="models")],
)
@app.task_queue(keep_warm_seconds=1000)
def transcribe():
return
Runtime
Defines the environment a task will be executed in.
Runtime()
Creates a Runtime instance.
It is used to define the CPU, memory, GPU (if applicable), and the container image used for the task execution.
Parameters
Parameter | Required | Description |
---|---|---|
cpu | No | The number of CPU cores allocated to the container. Default is 1. |
memory | No | The amount of memory allocated to the container. It should be specified in Kubernetes resource format (e.g., 500Mi for 500 megabytes). Default is 500Mi. |
gpu | No | The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. Default is GpuType.NoGPU. |
image | No | The container image used for the task execution. Default is Image. |
Examples
from beam import App, Runtime, Image
runtime = Runtime(
cpu=4,
memory="8Gi",
gpu="T4",
image=Image(python_version="python3.9"),
)
app = App(name="my-app", runtime=runtime)
@app.task_queue()
def my_queue():
...
Run
Defines an ephemeral task.
Run()
Creates a Run instance.
See App.run() for a description.
Parameters
Parameter | Required | Description |
---|---|---|
handler | Yes | The handler function or entry point of the task to be executed. It should be a reference to the function that will process the task. |
runtime | Yes | The runtime environment for the task execution. Default is None. |
outputs | No | A list of of artifacts created during the task execution. Default is []. |
**kwargs | No | Additional keyword arguments to pass to the handler. |
Examples
app.py
from beam import App, Runtime
app = App(name="my-app", runtime=Runtime())
@app.run()
def handler():
...
main.py
def handler():
...
Image
Defines a custom container image that your code will run in.
Image()
Creates an Image instance.
An Image object encapsulates the configuration of a custom container image that will be used as the runtime environment for executing tasks.
Parameters
Parameter | Required | Description |
---|---|---|
python_version | No | The Python version to be used in the image. Default is PythonVersion.Python38. |
python_packages | No | A list of Python packages to install in the container image. Alternatively, a string containing a path to a requirements.txt can be provided. Default is []. |
commands | No | A list of shell commands to run when building your container image. These commands can be used for setting up the environment, installing dependencies, etc. Default is []. |
Examples
from beam import App, Runtime, Image, PythonVersion
image = Image(
python_version=PythonVersion.Python38,
python_packages=["numpy", "pandas"],
commands=["apt update", "apt install -y libgomp1"]
)
app = App(name="my-app", runtime=Runtime(image=image))
@app.task_queue()
def my_queue():
...
Output
Defines which file or directory to save with a task.
Output()
Creates an Output instance.
Saves the file or directory and associates it with a task which can be downloaded at a later date.
Parameters
Parameter | Required | Description |
---|---|---|
path | Yes | The path of the file or directory produced during task execution. |
Examples
from beam import App, Runtime, Output
app = App(name="my-app", runtime=Runtime())
@app.task_queue(
outputs=[Output(path="my_file.txt")]
)
def handler():
...
Volume
A data store that can be attached to an app.
Volume()
Creates a Volume instance.
When your app runs, your volume will be available at ./{name}
and /volumes/{name}
.
Parameters
Parameter | Required | Description |
---|---|---|
name | Yes | The name of the volume, a descriptive identifier for the data volume. |
path | Yes | The path where the volume is mounted within the app environment. |
VolumeType | No | The type of volume. Default is VolumeType.Shared. |
Examples
from beam import Volume, VolumeType
# Shared Volume
shared_volume = Volume(
name='some_shared_data',
path='./my-shared-volume'
)
# Persistent Volume
persistent_volume = Volume(
name='persistent_data',
path='./my-persistent-volume'
volume_type=VolumeType.Persistent,
)
Autoscaling
Configures autoscaling for your functions.
This is deprecated. Please see the RequestLatencyAutoscaler.
Autoscaling()
Creates an Autoscaling instance.
Use this to define an autoscaling strategy for RESTful APIs or task queues.
Parameters
Parameter | Required | Description |
---|---|---|
max_replicas | No | The maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1. |
desired_latency | No | The maximum number of seconds to wait before a task is processed. Beam will add or remove replicas to ensure tasks are processed within this window. Default is 100. |
autoscaling_type | No | The type of autoscaling strategy to apply. Default is AutoscalingType.MaxRequestLatency. |
Examples
from beam import App, Runtime, Autoscaling
autoscaling=Autoscaling(
max_replicas=2,
desired_latency=30,
)
app = App(name="my-app", runtime=Runtime())
@app.task_queue(autoscaling=autoscaling)
def handler():
...
RequestLatencyAutoscaler
Defines a Request Latency based autoscaling strategy.
RequestLatencyAutoscaler()
Creates a RequestLatencyAutoscaler instance.
Parameters
Parameter | Required | Description |
---|---|---|
desired_latency | No | The maximum number of seconds to wait before a task is processed. Beam will add or remove replicas to ensure tasks are processed within this window. Default is 100. |
max_replicas | No | The maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1. |
Examples
from beam import App, Runtime, RequestLatencyAutoscaler
app = App(name="my-app", runtime=Runtime())
@app.task_queue(autoscaler=RequestLatencyAutoscaler(desired_latency=300))
def tq1():
pass
@app.task_queue(autoscaler={"request_latency": {"desired_latency": 300}})
def tq2():
pass
QueueDepthAutoscaler
Defines a Queue Depth based autoscaling strategy.
QueueDepthAutoscaler()
Creates a QueueDepthAutoscaler instance.
Parameters
Parameter | Required | Description |
---|---|---|
max_tasks_per_replica | No | The max number of tasks that can be queued up to a single replica. This can help manage throughput and cost of compute. When max_tasks_per_replica is 0, a replica can process any number of tasks. Default is 0. |
max_replicas | No | The maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1. |
Examples
from beam import App, Runtime, QueueDepthAutoscaler
app = App(name="my-app", runtime=Runtime())
@app.task_queue(
autoscaler=QueueDepthAutoscaler(max_tasks_per_replica=5, max_replicas=3)
)
def tq1():
pass
@app.task_queue(
autoscaler={"queue_depth": {"max_tasks_per_replica": 5, "max_replicas": 3}}
)
def tq2():
pass
PythonVersion
An enum that defines versions of Python.
Variables
Name | Type | Value |
---|---|---|
Python310 | str | "python3.10" |
Python37 | str | "python3.7" |
Python38 | str | "python3.8" |
Python39 | str | "python3.9" |
Examples
from beam import Image, PythonVersion
# with an enum
image = Image(python_version=PythonVersion.Python310)
# with a string
image = Image(python_version="python3.10")
GpuType
An enum that defines types of GPUs.
GPUs L4 and A100 are coming soon. Email us at founders@beam.cloud to learn more.
Variables
Name | Type | Value |
---|---|---|
A100_40 | str | "A100-40" |
A100_80 | str | "A100-80" |
A10G | str | "A10G" |
Any | str | "any" |
L4 | str | "L4" |
NoGPU | str | "" |
T4 | str | "T4" |
Examples
from beam import Runtime, GpuType
r = Runtime(gpu=GpuType.T4)
VolumeType
An enum that defines types of volumes.
Variables
Name | Type | Value |
---|---|---|
Persistent | str | "persistent" |
Shared | str | "shared" |
Examples
from beam import Volume, VolumeType
pv = Volume(
name='my-persistent-data',
path='./my-persistent-volume'
VolumeType=VolumeType.Persistent,
)
AutoscalingType
An enum that defines types of autoscaling.
This is deprecated. Please see the RequestLatencyAutoscaler.
Variables
Name | Type | Value |
---|---|---|
MaxRequestLatency | str | "max_request_latency" |
Examples
from beam import Autoscaling, AutoscalingType
a = Autoscaling(autoscaling_type=AutoscalingType.MaxRequestLatency)
Was this page helpful?