App

Defines a namespace for your functions.

App()

Creates an App instance.

Parameters

ParameterRequiredDescription
nameYesThe unique identifier for the app.
volumesNoA list of storage volumes to be associated with the app. Default is [].
runtimeNoThe runtime environment for the app execution. Defines the container environment and hardware configuration the app will run on. If not specified, a runtime will need to be passed inline to any function triggers attached to this app. Default is None.

Examples

Running functions in an app

First, instantiate an App() object. You can then decorate any function with the app object and a trigger type, for example @app.run():

from beam import App, Runtime, Image, Output, Volume

app = App(
    name="my_app",
    runtime=Runtime(
        cpu=1,
        gpu="A10G",
        image=Image(
            python_version="python3.8",
            python_packages=["torch"],
            commands=["apt-get install ffmpeg"],
        ),
    ),
    volumes=[Volume(name="my_models", path="models")],
)

@app.run(outputs=[Output(path="./test.txt")])
def some_function():
    return

App.asgi()

Decorator used for deploying an arbitrary ASGI-compatible application.

Parameters

ParameterRequiredDescription
runtimeNoThe runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None.
autoscalerNoDefines an autoscaling strategy for your workload. See RequestLatencyAutoscaler.
loaderNoA function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None.
max_pending_tasksNoThe maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100.
keep_warm_secondsNoThe duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 60s.
task_policyNoThe task policy object where you can specify max retries and timeout
timeoutNoThe maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout)
workersNoThe number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1.
authorizedNoEnable global basic authentication for all endpoints in your app. Default is True.

Examples

from beam import App, Runtime, Image, Volume
from fastapi import FastAPI

app = App(
    name="vocalize",
    runtime=Runtime(
        cpu=1,
        gpu="A10G",
        image=Image(
            python_version="python3.8",
            python_packages=["torch"],
            commands=["apt-get install ffmpeg"],
        ),
    ),
    volumes=[Volume(name="my_models", path="models")],
)

@app.asgi(keep_warm_seconds=1000, workers=2)
def transcribe():
    my_app = FastAPI()

    @my_app.get("/something")
    def func():
        return {}

    @my_app.post("/something-else")
    def func():
        return {}

    return my_app

App.rest_api()

Decorator used for deploying a RESTful API.

This method sets up a RESTful API endpoint that can receive task submissions for processing. When tasks are submitted to this endpoint, they are added to a task queue and processed synchronously. If the task takes longer than 90 seconds to complete, it will continue to run asynchronously and the results can be retrieved by firing a callback using callback_url or retrieving the results manually calling api.beam.cloud/v1/task/{task_id}/status/.

Parameters

ParameterRequiredDescription
runtimeNoThe runtime environment to execute the tasks submitted via the API. It can be a dictionary containing runtime configurations or an instance of the Runtime class. Default is None.
outputsNoA list of outputs or output configurations for handling task results. Default is [].
autoscalingNo[DEPRECATED] Autoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None.
autoscalerNoDefines an autoscaling strategy for your workload. See RequestLatencyAutoscaler.
loaderNoA function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None.
callback_urlNoA URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None.
max_pending_tasksNoThe maximum number of tasks that can be pending in the queue before rejecting new submissions to the API. Default is 100.
keep_warm_secondsNoThe duration in seconds to keep the container warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive via the API. Default is 60s.
task_policyNoThe task policy object where you can specify max retries and timeout
timeoutNoThe maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout)
max_retriesNoThe maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries)
workersNoThe number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1.
authorizedNoEnable basic authentication for this function. Get your API keys here: https://docs.beam.cloud/account/api-keys Default is True.

Examples

from beam import App, Runtime, Image, Volume

app = App(
    name="vocalize",
    runtime=Runtime(
        cpu=1,
        gpu="A10G",
        image=Image(
            python_version="python3.8",
            python_packages=["torch"],
            commands=["apt-get install ffmpeg"],
        ),
    ),
    volumes=[Volume(name="my_models", path="models")],
)

@app.rest_api(keep_warm_seconds=1000)
def transcribe():
    return

App.run()

Decorator used for running code immediately.

This method is used to define the configuration for executing your code on Beam, without scheduling it or deploying it as an API.

Parameters

ParameterRequiredDescription
runtimeNoThe runtime environment for the task execution. Default is None.
outputsNoA list of of artifacts created during the task execution. Default is [].
callback_urlNoA URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None.
task_policyNoThe task policy object where you can specify max retries and timeout
timeoutNoThe maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout)

Examples

Defining a run in your project

from beam import App, Runtime

app = App(name="my-app", runtime=Runtime())

@app.run()
def handler():
    ...

Starting a run with the CLI

When you run the beam run command, this decorator function will be run asynchronously, and the results can be obtained through the /task API or web dashboard.

When starting a run, you can pass data to your function using the -d argument:

beam run my-file:func -d '{"text": "some-text"}'

App.schedule()

Decorator used for scheduling a task.

This method is used to add configuration for scheduling a task for future execution at a specified time or interval. The task will be added to a scheduler, and when the scheduled time or interval is reached, the task will be executed asynchronously.

Parameters

ParameterRequiredDescription
whenYesThe scheduling time or interval for the task execution. It can be a cron or string expressions. You can use Crontab to help generate expressions.
runtimeNoThe runtime environment to execute the scheduled task. Default is None.
outputsNoA list of outputs for handling the results of the scheduled task. Default is [].
callback_urlNoA URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None.
task_policyNoThe task policy object where you can specify max retries and timeout
timeoutNoThe maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout)
max_retriesNoThe maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries)

Examples

# Runs every 5 minutes
@app.schedule(when="every 5m")

# Runs every 1 hour
@app.schedule(when="every 1h")

# Runs every day at midnight
@app.schedule(when="0 0 * * *")

App.task_queue()

Decorator used for deploying a task queue.

This method allows you to add tasks to a task queue for processing. The tasks are executed asynchronously, and the results can be obtained later, through firing a callback using callback_url, or retrieving the results manually by calling api.beam.cloud/v1/task/{task_id}/status/.

Parameters

ParameterRequiredDescription
runtimeNoThe runtime environment to execute the tasks. It can be a dictionary containing runtime configurations or an instance of the Runtime class. If not specified, the default runtime will be used. Default is None.
outputsNoA list of outputs or output configurations for handling task results. Each element can be an Output instance or a dictionary with output configurations. Default is [].
autoscalingNo[DEPRECATED] Autoscaling configurations for the task queue. It can be a dictionary containing autoscaling settings or an instance of the Autoscaling class. If not provided, autoscaling will not be applied. Default is None.
autoscalerNoDefines an autoscaling strategy for your workload. See RequestLatencyAutoscaler.
loaderNoA function that runs exactly once when the app is first started. Useful for loading models or performing initialization of the app. Default is None.
callback_urlNoA URL where task execution status updates will be sent. If provided, the system will make a single POST request to this URL with status updates for each task. Default is None.
max_pending_tasksNoThe maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks. Default is 100.
keep_warm_secondsNoThe duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.
task_policyNoThe task policy object where you can specify max retries and timeout
timeoutNoThe maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. (Overrides task_policy.timeout)
max_retriesNoThe maximum number of times a task can be retried. Default is 3. (Overrides task_policy.max_retries)
workersNoThe number of workers to launch per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined for your App. You may need to increase these values to utilize more workers. Default is 1.
authorizedNoEnable basic authentication for this function. Get your API keys here: https://docs.beam.cloud/account/api-keys Default is True.

Examples

from beam import App, Runtime, Image, Volume

app = App(
    name="vocalize",
    runtime=Runtime(
        cpu=1,
        gpu="A10G",
        image=Image(
            python_version="python3.8",
            python_packages=["torch"],
            commands=["apt-get install ffmpeg"],
        ),
    ),
    volumes=[Volume(name="my_models", path="models")],
)

@app.task_queue(keep_warm_seconds=1000)
def transcribe():
    return

Runtime

Defines the environment a task will be executed in.

Runtime()

Creates a Runtime instance.

It is used to define the CPU, memory, GPU (if applicable), and the container image used for the task execution.

Parameters

ParameterRequiredDescription
cpuNoThe number of CPU cores allocated to the container. Default is 1.
memoryNoThe amount of memory allocated to the container. It should be specified in Kubernetes resource format (e.g., 500Mi for 500 megabytes). Default is 500Mi.
gpuNoThe type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. Default is GpuType.NoGPU.
imageNoThe container image used for the task execution. Default is Image.

Examples

from beam import App, Runtime, Image

runtime = Runtime(
    cpu=1,
    memory="8Gi",
    gpu="T4",
    image=Image(python_version="python3.9"),
)

app = App(name="my-app", runtime=runtime)

@app.task_queue()
def my_queue():
    ...

Run

Defines an ephemeral task.

Run()

Creates a Run instance.

See App.run() for a description.

Parameters

ParameterRequiredDescription
handlerYesThe handler function or entry point of the task to be executed. It should be a reference to the function that will process the task.
runtimeYesThe runtime environment for the task execution. Default is None.
outputsNoA list of of artifacts created during the task execution. Default is [].
**kwargsNoAdditional keyword arguments to pass to the handler.

Examples

from beam import Run, Runtime

run = Run(handler="handler", runtime=Runtime())

def handler():
    ...

Image

Defines a custom container image that your code will run in.

Image()

Creates an Image instance.

An Image object encapsulates the configuration of a custom container image that will be used as the runtime environment for executing tasks.

Parameters

ParameterRequiredDescription
python_versionNoThe Python version to be used in the image. Default is PythonVersion.Python38.
python_packagesNoA list of Python packages to install in the container image. Alternatively, a string containing a path to a requirements.txt can be provided. Default is [].
commandsNoA list of shell commands to run when building your container image. These commands can be used for setting up the environment, installing dependencies, etc. Default is [].
base_imageNoA custom base image to replace the default ubuntu20.04 image used in your container. For example: docker.io/library/ubuntu:20.04 This image must contain a valid python executable that matches the version specified in python_version (i.e. python3.8, python3.9, etc) Default is None.

Examples

from beam import App, Runtime, Image, PythonVersion

image = Image(
    python_version=PythonVersion.Python38,
    python_packages=["numpy", "pandas"],
    commands=["apt update", "apt install -y libgomp1"]
)

app = App(name="my-app", runtime=Runtime(image=image))

@app.task_queue()
def my_queue():
    ...

Output

Defines which file or directory to save with a task.

Output()

Creates an Output instance.

Saves the file or directory and associates it with a task which can be downloaded at a later date.

Parameters

ParameterRequiredDescription
pathYesThe path of the file or directory produced during task execution.

Examples

from beam import App, Runtime, Output

app = App(name="my-app", runtime=Runtime())

@app.task_queue(
    outputs=[Output(path="my_file.txt")]
)
def handler():
    ...

Volume

A data store that can be attached to an app.

Volume()

Creates a Volume instance.

When your app runs, your volume will be available at ./{name} and /volumes/{name}.

Parameters

ParameterRequiredDescription
nameYesThe name of the volume, a descriptive identifier for the data volume.
pathYesThe path where the volume is mounted within the app environment.
volume_typeNoThe type of volume. Default is VolumeType.Shared.

Examples

from beam import Volume, VolumeType

# Shared Volume
shared_volume = Volume(
    name='some_shared_data',
    path='./my-shared-volume'
)

# Persistent Volume
persistent_volume = Volume(
    name='persistent_data',
    path='./my-persistent-volume'
    volume_type=VolumeType.Persistent,
)

Autoscaling

Configures autoscaling for your functions.

This is deprecated. Please see the RequestLatencyAutoscaler.

Autoscaling()

Creates an Autoscaling instance.

Use this to define an autoscaling strategy for RESTful APIs or task queues.

Parameters

ParameterRequiredDescription
max_replicasNoThe maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1.
desired_latencyNoThe maximum number of seconds to wait before a task is processed. Beam will add or remove replicas to ensure tasks are processed within this window. Default is 100.
autoscaling_typeNoThe type of autoscaling strategy to apply. Default is AutoscalingType.MaxRequestLatency.

Examples

from beam import App, Runtime, Autoscaling

autoscaling=Autoscaling(
    max_replicas=2,
    desired_latency=30,
)

app = App(name="my-app", runtime=Runtime())

@app.task_queue(autoscaling=autoscaling)
def handler():
    ...

RequestLatencyAutoscaler

Defines a Request Latency based autoscaling strategy.

RequestLatencyAutoscaler()

Creates a RequestLatencyAutoscaler instance.

Parameters

ParameterRequiredDescription
desired_latencyNoThe maximum number of seconds to wait before a task is processed. Beam will add or remove replicas to ensure tasks are processed within this window. Default is 100.
max_replicasNoThe maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1.

Examples

from beam import App, Runtime, RequestLatencyAutoscaler

app = App(name="my-app", runtime=Runtime())

@app.task_queue(autoscaler=RequestLatencyAutoscaler(desired_latency=300))
def tq1():
    pass

@app.task_queue(autoscaler={"request_latency": {"desired_latency": 300}})
def tq2():
    pass

QueueDepthAutoscaler

Defines a Queue Depth based autoscaling strategy.

QueueDepthAutoscaler()

Creates a QueueDepthAutoscaler instance.

Parameters

ParameterRequiredDescription
max_tasks_per_replicaNoThe max number of tasks that can be queued up to a single replica. This can help manage throughput and cost of compute. When max_tasks_per_replica is 0, a replica can process any number of tasks. Default is 0.
max_replicasNoThe maximum number of replicas that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption. Default is 1.

Examples

from beam import App, Runtime, QueueDepthAutoscaler

app = App(name="my-app", runtime=Runtime())

@app.task_queue(
    autoscaler=QueueDepthAutoscaler(max_tasks_per_replica=5, max_replicas=3)
)
def tq1():
    pass

@app.task_queue(
    autoscaler={"queue_depth": {"max_tasks_per_replica": 5, "max_replicas": 3}}
)
def tq2():
    pass

PythonVersion

An enum that defines versions of Python.

Variables

NameTypeValue
Python310str"python3.10"
Python37str"python3.7"
Python38str"python3.8"
Python39str"python3.9"

Examples

from beam import Image, PythonVersion

# with an enum
image = Image(python_version=PythonVersion.Python310)

# with a string
image = Image(python_version="python3.10")

GpuType

An enum that defines types of GPUs.

The A100-80 GPUs are in beta. Email us at founders@beam.cloud to learn more.

Variables

NameTypeValue
A100_80str"A100-80"
A10Gstr"A10G"
Anystr"any"
NoGPUstr""
T4str"T4"

Examples

from beam import Runtime, GpuType

r = Runtime(gpu=GpuType.T4)

VolumeType

An enum that defines types of volumes.

Variables

NameTypeValue
Persistentstr"persistent"
Sharedstr"shared"

Examples

from beam import Volume, VolumeType

pv = Volume(
    name='my-persistent-data',
    path='./my-persistent-volume'
    volume_type=VolumeType.Persistent,
)

AutoscalingType

An enum that defines types of autoscaling.

This is deprecated. Please see the RequestLatencyAutoscaler.

Variables

NameTypeValue
MaxRequestLatencystr"max_request_latency"

Examples

from beam import Autoscaling, AutoscalingType

a = Autoscaling(autoscaling_type=AutoscalingType.MaxRequestLatency)