Beam’s Python SDK is the heart of the Beam platform. Unlike traditional cloud providers, Beam apps are defined entirely in code — no YAML, no config files. All infrastructure and runtime configuration is expressed in Python.

This reference outlines every available decorator, object, and configuration option in the SDK. For a quickstart or high-level overview, check out the Getting Started guide.

Environment

Image

Defines a custom container image that your code will run in.

An Image object encapsulates the configuration of a custom container image that will be used as the runtime environment for executing tasks.

from beam import endpoint, Image


image = (
    Image(
        base_image="docker.io/nvidia/cuda:12.3.1-runtime-ubuntu20.04",
        python_version="python3.9",
    )
    .add_commands(["apt-get update -y", "apt-get install ffmpeg -y"])
    .add_python_packages(["transformers", "torch"])
    .build_with_gpu(gpu="A10G")
)


@endpoint(image=image)
def handler():
    return {}
python_version
string
default:"3.8"

The Python version to be used in the image. Defaults to Python 3.8.

python_packages
list
default:"None"

A list of Python packages to install in the container image. Alternatively, a string containing a path to a requirements.txt can be provided. Default is [].

commands
list
default:"None"

A list of shell commands to run when building your container image. These commands can be used for setting up the environment, installing dependencies, etc. Default is [].

base_image
string
default:"None"

A custom base image to replace the default ubuntu20.04 image used in your container. This can be a public or private image from Docker Hub, Amazon ECR, Google Cloud Artifact Registry, or NVIDIA GPU Cloud Registry. The formats for these registries are respectively docker.io/my-org/my-image:0.1.0, 111111111111.dkr.ecr.us-east-1.amazonaws.com/my-image:latest, us-east4-docker.pkg.dev/my-project/my-repo/my-image:0.1.0, and nvcr.io/my-org/my-repo:0.1.0. Default is None.

base_image_creds
dict
default:"None"

A key/value pair or key list of environment variables that contain credentials to a private registry. When provided as a dict, you must supply the correct keys and values. When provided as a list, the keys are used to lookup the environment variable value for you. Default is None.

List of Base Image Creds

image = Image(
  base_image="111111111111.dkr.ecr.us-east-1.amazonaws.com/my-app:latest",
  base_image_creds=[
      "AWS_ACCESS_KEY_ID",
      "AWS_SECRET_ACCESS_KEY",
      "AWS_SESSION_TOKEN",
      "AWS_REGION",
  ],
)

Dict of Base Image Creds

image = Image(
    base_image="111111111111.dkr.ecr.us-east-1.amazonaws.com/my-app:latest",
    base_image_creds={
      "AWS_ACCESS_KEY_ID": "xxxx",
      "AWS_SECRET_ACCESS_KEY": "xxxx"
      "AWS_REGION": "xxxx"
    },
)
env_vars
dict
default:"None"

Adds environment variables to an image. These will be available when building the image and when the container is running. This can be a string, a list of strings, or a dictionary of strings. The string must be in the format of KEY=VALUE. If a list of strings is provided, each element should be in the same format. Default is None.

build_with_gpu
string
default:"None"

Builds the image on a GPU.

Context

Context is a dataclass used to store various useful fields you might want to access in your entry point logic.

Field NameTypeDefault ValuePurpose
container_idOptional[str]NoneUnique identifier for a container
stub_idOptional[str]NoneIdentifier for a stub
stub_typeOptional[str]NoneType of the stub (function, endpoint, taskqueue, etc)
callback_urlOptional[str]NoneURL called when the task status changes
task_idOptional[str]NoneIdentifier for the specific task
timeoutOptional[int]NoneMaximum time allowed for the task to run (seconds)
on_start_valueOptional[Any]NoneAny values returned from the on_start function
bind_portint0Port number to bind a service to
python_versionstr""Version of Python to be used

Sandbox

A sandboxed container for running Python code or arbitrary processes.

You can use this to create isolated environments where you can execute code, manage files, and run processes.

Sandbox.connect()

Connect to an existing sandbox instance by ID.

id
<class 'str'>

The container ID of the existing sandbox instance.

# Connect to an existing sandbox
instance = sandbox.connect("sandbox-123")

Sandbox.create()

Create a new sandbox instance.

This method creates a new containerized sandbox environment with the specified configuration.

# Create a new sandbox
instance = sandbox.create()
print(f"Sandbox created with ID: {instance.sandbox_id()}")

Sandbox.debug()

Print the debug buffer contents to stdout.

This method outputs any debug information that has been collected during sandbox operations.

SandboxInstance

A sandbox instance that provides access to the sandbox internals.

This class represents an active sandboxed container and provides methods for process management, file system operations, preview URLs, and lifecycle management.

SandboxInstance.expose_port()

Dynamically expose a port to the internet.

This method creates a public URL that allows external access to a specific port within the sandbox. The URL is SSL-terminated and provides secure access to services running in the sandbox.

port
<class 'int'>

The port number to expose within the sandbox.

# Expose port 8000 for a web service
url = instance.expose_port(8000)
print(f"Web service available at: {url}")

SandboxInstance.sandbox_id()

Get the ID of the sandbox.

sandbox_id = instance.sandbox_id()
print(f"Working with sandbox: {sandbox_id}")

SandboxInstance.terminate()

Terminate the container associated with this sandbox instance.

This method stops the sandbox container and frees up associated resources. Once terminated, the sandbox instance cannot be used for further operations.

# Terminate the sandbox
success = instance.terminate()
if success:
    print("Sandbox terminated successfully")

SandboxInstance.update_ttl()

Update the keep warm setting of the sandbox.

This method allows you to change how long the sandbox will remain active before automatically shutting down.

ttl
<class 'int'>

The number of seconds to keep the sandbox alive. Use -1 for sandboxes that never timeout.

# Keep the sandbox alive for 1 hour
instance.update_ttl(3600)

# Make the sandbox never timeout
instance.update_ttl(-1)

SandboxProcess

Represents a running process within a sandbox.

This class provides control and monitoring capabilities for processes running in the sandbox. It allows you to wait for completion, kill processes, check status, and access output streams.

SandboxProcess.kill()

Kill the process.

This method forcefully terminates the running process. Use this when you need to stop a process that is not responding or when you want to cancel a long-running operation.

process = pm.exec("sleep", "100")

# Kill the process after 5 seconds
import time
time.sleep(5)
process.kill()

Returns a combined stream of both stdout and stderr.

This is a convenience property that combines both output streams. The streams are read concurrently, so if one stream is empty, it won’t block the other stream from being read.

process = pm.exec("python3", "-c", "import sys; print('stdout'); print('stderr', file=sys.stderr)")

# Read combined output
for line in process.logs:
    print(f"LOG: {line.strip()}")

# Or read all at once
all_logs = process.logs.read()

SandboxProcess.status()

Get the status of the process.

This method returns the current exit code and status string of the process. An exit code of -1 indicates the process is still running.

process = pm.exec("sleep", "5")

# Check status periodically
while True:
    exit_code, status = process.status()
    if exit_code >= 0:
        print(f"Process finished with exit code: {exit_code}")
        break
    time.sleep(1)

Get a handle to a stream of the process’s stderr.

process = pm.exec("python3", "-c", "import sys; print('Error', file=sys.stderr)")
stderr_content = process.stderr.read()
print(f"STDERR: {stderr_content}")

Get a handle to a stream of the process’s stdout.

process = pm.exec("echo", "Hello World")
stdout_content = process.stdout.read()
print(f"STDOUT: {stdout_content}")

SandboxProcess.wait()

Wait for the process to complete.

This method blocks until the process finishes execution and returns the exit code. It polls the process status until completion.

process = pm.exec("long_running_command")
exit_code = process.wait()
if exit_code == 0:
    print("Command completed successfully")

SandboxProcessManager

Manager for executing and controlling processes within a sandbox.

This class provides a high-level interface for running commands and Python code within the sandbox environment. It supports both blocking and non-blocking execution, environment variable configuration, and working directory specification.

SandboxProcessManager.exec()

Run an arbitrary command in the sandbox.

This method executes shell commands within the sandbox environment. The command is executed using the shell available in the sandbox.

*args
string

The command and its arguments to execute.

cwd
Optional[str]
default:"None"

The working directory to run the command in. Default is None.

env
Optional[Dict[str, str]]
default:"None"

Environment variables to set for the command. Default is None.

# Run a simple command
process = pm.exec("ls", "-la")
process.wait()

# Run with custom environment
process = pm.exec("echo", "$CUSTOM_VAR", env={"CUSTOM_VAR": "hello"})

# Run in specific directory
process = pm.exec("pwd", cwd="/tmp")

SandboxProcessManager.get_process()

Get a process by its PID.

pid
<class 'int'>

The process ID to look up.

try:
    process = pm.get_process(12345)
    print(f"Found process: {process.pid}")
except SandboxProcessError:
    print("Process not found")

SandboxProcessManager.list_processes()

List all processes running in the sandbox.

processes = pm.list_processes()
for process in processes:
    print(f"Process {process.pid} is running")

SandboxProcessManager.run_code()

Run Python code in the sandbox.

This method executes Python code within the sandbox environment. The code is executed using the Python interpreter available in the sandbox.

code
<class 'str'>

The Python code to execute.

blocking
<class 'bool'>
default:"True"

Whether to wait for the process to complete. If True, returns SandboxProcessResponse. If False, returns SandboxProcess.

cwd
Optional[str]
default:"None"

The working directory to run the code in. Default is None.

env
Optional[Dict[str, str]]
default:"None"

Environment variables to set for the process. Default is None.

# Run blocking Python code
result = pm.run_code("print('Hello from sandbox!')")
print(result.result)

# Run non-blocking Python code
process = pm.run_code("import time; time.sleep(10)", blocking=False)
# Do other work while process runs
process.wait()

SandboxProcessResponse

Response object containing the results of a completed process execution.

This class encapsulates the output and status information from a process that has finished running in the sandbox.

SandboxProcessStream

A stream-like interface for reading process output in real-time.

This class provides an iterator interface for reading stdout or stderr from a running process. It buffers output and provides both line-by-line iteration and bulk reading capabilities.

Example:

# Get a process stream
process = pm.exec("echo", "Hello World")

# Read line by line
for line in process.stdout:
    print(f"Output: {line.strip()}")

# Read all output at once
all_output = process.stdout.read()

SandboxProcessStream()

SandboxProcessStream.read()

Read all remaining output from the stream.

SandboxProcessError

SandboxConnectionError

SandboxFileInfo

Metadata of a file in the sandbox.

This class provides detailed information about files and directories within the sandbox filesystem, including permissions, ownership, and modification times.

SandboxFileSystem

File system interface for managing files within a sandbox.

This class provides a comprehensive API for file operations within the sandbox, including uploading, downloading, listing, and managing files and directories.

SandboxFileSystem.create_directory()

Create a directory in the sandbox.

Note: This method is not yet implemented.

sandbox_path
<class 'str'>

The path where the directory should be created.

SandboxFileSystem.delete_directory()

Delete a directory in the sandbox.

Note: This method is not yet implemented.

sandbox_path
<class 'str'>

The path of the directory to delete.

SandboxFileSystem.delete_file()

Delete a file in the sandbox.

This method removes a file from the sandbox filesystem.

sandbox_path
<class 'str'>

The path to the file within the sandbox.

# Delete a temporary file
fs.delete_file("/tmp/temp_file.txt")

# Delete a log file
fs.delete_file("/var/log/old_log.log")

SandboxFileSystem.download_file()

Download a file from the sandbox to a local path.

This method downloads a file from the sandbox filesystem and saves it to the specified local path.

sandbox_path
<class 'str'>

The path to the file within the sandbox.

local_path
<class 'str'>

The destination path on the local filesystem.

# Download a log file
fs.download_file("/var/log/app.log", "local_app.log")

# Download to a specific directory
fs.download_file("/output/result.txt", "./results/result.txt")

SandboxFileSystem.find_in_files()

Find files matching a pattern in the sandbox.

This method searches for files within the specified directory that match the given pattern.

sandbox_path
<class 'str'>

The directory path to search in.

pattern
<class 'str'>

The pattern to match files against.

# Find all Python files
python_files = fs.find_in_files("/workspace", "*.py")

# Find all log files
log_files = fs.find_in_files("/var/log", "*.log")

SandboxFileSystem.list_files()

List the files in a directory in the sandbox.

This method returns information about all files and directories within the specified directory in the sandbox.

sandbox_path
<class 'str'>

The path to the directory within the sandbox.

# List files in the root directory
files = fs.list_files("/")
for file_info in files:
    if file_info.is_dir:
        print(f"Directory: {file_info.name}")
    else:
        print(f"File: {file_info.name} ({file_info.size} bytes)")

# List files in a specific directory
workspace_files = fs.list_files("/workspace")

SandboxFileSystem.replace_in_files()

Replace a string in all files in a directory.

This method performs a find-and-replace operation on all files within the specified directory, replacing occurrences of the old string with the new string.

sandbox_path
<class 'str'>

The directory path to search in.

old_string
<class 'str'>

The string to find and replace.

new_string
<class 'str'>

The string to replace with.

# Replace a configuration value
fs.replace_in_files("/config", "old_host", "new_host")

# Update version numbers
fs.replace_in_files("/app", "1.0.0", "1.1.0")

SandboxFileSystem.stat_file()

Get the metadata of a file in the sandbox.

This method retrieves detailed information about a file or directory within the sandbox, including size, permissions, ownership, and modification time.

sandbox_path
<class 'str'>

The path to the file within the sandbox.

# Get file information
file_info = fs.stat_file("/path/to/file.txt")
print(f"File size: {file_info.size} bytes")
print(f"Is directory: {file_info.is_dir}")
print(f"Modified: {file_info.mod_time}")

SandboxFileSystem.upload_file()

Upload a local file to the sandbox.

This method reads a file from the local filesystem and uploads it to the specified path within the sandbox.

local_path
<class 'str'>

The path to the local file to upload.

sandbox_path
<class 'str'>

The destination path within the sandbox.

# Upload a Python script
fs.upload_file("my_script.py", "/workspace/script.py")

# Upload to a subdirectory
fs.upload_file("config.json", "/app/config/config.json")

SandboxFileSystemError

SandboxFilePosition

A position in a file.

SandboxFilePosition()

SandboxFileSearchMatch

A match in a file.

SandboxFileSearchMatch()

SandboxFileSearchRange

A range in a file.

SandboxFileSearchRange()

Pod

A Pod is an object that allows you to run arbitrary services in a fast, scalable, and secure remote container on Beam.

You can think of a Pod as a lightweight compute environment that you fully control—complete with a custom container, ports you can expose, environment variables, volumes, secrets, and GPUs.

from beam import Pod, Image

# Create a Pod that runs a simple HTTP server
pod = Pod(
    name="web-server",
    cpu=2,
    memory="512Mi",
    image=Image(
        base_image="python:3.9-slim",
        python_packages=["requests"],
    ),
    ports=[8000],  # We'll expose port 8000
)

# Spin up the Pod container, running `python -m http.server 8000`
result = pod.create(entrypoint=["python", "-m", "http.server", "8000"])

print("Container ID:", result.container_id)
print("URL:", result.url)

When you run this snippet (e.g., python app.py), Beam will:

  • Build your container (if necessary) and sync your local files to the remote environment.
  • Create a Pod container with the specified resources (2 CPU cores, 512 MiB memory).
  • Run python -m http.server 8000 inside that remote container.
  • Expose the container on port 8000. You’ll get back a container ID and a URL to access it.
  • Once the Pod is running, you can perform additional operations—like opening an interactive shell inside the container or deploying the Pod as a named app.
entrypoint
List[str]
default:"[]"

The command to run in the container. By default, nothing is specified, so you must provide an entrypoint to actually run anything. You can override or provide this entrypoint at creation time using pod.create(entrypoint=...).

ports
Optional[List[int]]
default:"[]"

A list of ports to expose. If provided, the container will be accessible through an HTTP URL for each port opened. For example, if [8000] is specified, you’ll get <Pod URL>:8000.

name
Optional[str]
default:"None"

An optional name for the pod. If you plan to deploy this Pod (i.e., treat it as a persistent app), you should specify a name. If you do not specify a name, Beam will generate a random name at deploy time, or you must specify --name=... in the CLI.

cpu
Union[int, float, str]
default:"1.0"

The amount of CPU allocated to the container. For example, 2 means 2 CPU cores, "500m" might mean half a CPU core, 1.0 means 1 CPU core, etc.

memory
Union[int, str]
default:"128"

The amount of memory (in MiB) allocated to the container. You can also specify this as a string with units (e.g., "512Mi", "2Gi").

gpu
Union[GpuTypeAlias, List[GpuTypeAlias]]
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. You can specify multiple GPUs by providing a list (in which case the scheduler prioritizes their selection based on the order in the list). If no GPU is required, leave it empty.

gpu_count
int
default:"0"

The number of GPUs allocated to the container. If a GPU is specified but this value is set to 0, it will automatically be updated to 1.

image
Image
default:"Image()"

The container image to be used for running the Pod. Defaults to a basic Beam Image object, which can be customized (e.g., base_image=, python_packages=, and more).

volumes
Optional[List[Volume]]
default:"None"

A list of volumes to be mounted into the container. Volumes allow you to persist data or mount external storage services, such as S3-compatible buckets.

secrets
Optional[List[str]]
default:"None"

A list of secrets that are injected into the container as environment variables. Each secret must be configured in your Beam project.

env
Optional[Dict[str, str]]
default:"{}"

A dictionary of environment variables to inject into the container. For example: {"MY_API_KEY": "abc123"}.

keep_warm_seconds
int
default:"-1"

The number of seconds to keep the container alive after the last request. A value of -1 means never scale down to zero (i.e., keep the container running indefinitely). This only applies if you deploy the Pod.

authorized
bool
default:"False"

If False, allows the container to be accessed without an auth token. This is useful for public-facing services. If you need to secure it behind an auth token, set it to True.

Create

Create a new container that runs until it completes or is explicitly killed.

from beam import Pod

pod = Pod(cpu=2, memory="1Gi", ports=[8080])
result = pod.create(entrypoint=["python", "-m", "http.server", "8080"])

if result.ok:
    print("Pod created successfully!")
    print("Container ID:", result.container_id)
    print("URL:", result.url)
else:
    print("Failed to create Pod")

Deploy

Deploy the Pod as a named persistent service. Pods can be deployed programmatically via Python, or CLI.

Deploying via Python

app.py
from beam import Pod

pod = Pod(
    name="my-deployed-pod",
    cpu=2,
    memory="1Gi",
    ports=[8000],
    entrypoint=["python", "-m", "http.server", "8000"],
)

# Deploy the Pod
ok = pod.deploy()
if ok:
    print("Pod successfully deployed!")
else:
    print("Pod deployment failed!")
app.py
python app.py

Deploying via CLI

app.py
from beam import Pod

pod = Pod(
    name="my-deployed-pod",
    cpu=2,
    memory="1Gi",
    ports=[8000],
    entrypoint=["python", "-m", "http.server", "8000"],
)
beam deploy app.py:pod

Function

Decorator for defining a remote function.

This method allows you to run the decorated function in a remote container.

Function
from beam import Image, Function


@function(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def transcribe(filename: str):
    print(filename)
    return "some_result"


# Call a function in a remote container
function.remote("some_file.mp4")
# Map the function over several inputs
# Each of these inputs will be routed to remote containers
for result in function.map(["file1.mp4", "file2.mp4"]):
    print(result)
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in MiB, or as a string with units (e.g., “1Gi”).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. Multiple GPUs can be specified as a list.

image
string
default:"Image"

The container image used for task execution.

timeout
int
default:"3600"

The maximum number of seconds a task can run before timing out. Set to -1 to disable the timeout.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

callback_url
string
default:"None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

volumes
list
default:"None"

A list of storage volumes to be associated with the function.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this function, used during deployment. If not specified, you must specify the name at deploy time with the --name argument.

task_policy
TaskPolicy
default:"None"

The task policy for the function. This helps manage the lifecycle of an individual task. Setting values here will override timeout and retries.

retry_for
list
default:"None"

A list of exceptions that will trigger a retry.

Remote

You can run any function remotely on Beam by using the .remote() method:

from beam import function


@function(cpu=8)
def square(i: int):
    return i**2


if __name__ == "__main__":
    # Run the `square` function remotely on Beam
    result = square.remote(i=12)
    print(result)

The code above is invoked by running python example.py:

(.venv) user@MacBook % python example.py
=> Building image
=> Using cached image
=> Syncing files
=> Files synced
=> Running function: <example:square>
=> Function complete <908c76b1-ee68-4b33-ac3a-026ae646625f>
144

Map

You can scale out workloads to many containers using the .map() method. You might use this for parallelizing computational-heavy tasks, such as batch inference or data processing jobs.

from beam import function


@function(cpu=8)
def square(i: int):
    return i**2


def main():
    numbers = list(range(10))
    squared = []

    # Run a remote container for every item in list
    for result in square.map(numbers):
        squared.append(result)

Schedule

This method allows you to schedule the decorated function to run at specific intervals defined by a cron expression.

from beam import schedule


@schedule(when="*/5 * * * *", name="scheduled-job")
def task():
    print("Hi, from scheduled task!")
when
string
default:"None"

The cron expression or predefined schedule that determines when the task will run. This parameter defines the interval or specific time when the task should execute.

cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default:"Image"

The container image used for the task execution..

timeout
float
default:"180"

The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.

workers
int
default:"1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

keep_warm_seconds
int
default:"300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

callback_url
string
default:"None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

volumes
list
default:"None"

A list of volumes to be mounted to the container.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this endpoint, used during deployment. If not specified, you must specify the name at deploy time with the --name argument

Scheduling Options

Predefined ScheduleDescriptionCron Expression
@yearly (or @annually)Run once a year at midnight on January 1st0 0 1 1 *
@monthlyRun once a month at midnight on the first day of the month0 0 1 * *
@weeklyRun once a week at midnight on Sunday0 0 * * 0
@daily (or @midnight)Run once a day at midnight0 0 * * *
@hourlyRun once an hour at the beginning of the hour0 * * * *

Endpoint

Decorator used for deploying a web endpoint.

from beam import endpoint, Image


@endpoint(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(**inputs):
    result = inputs["x"] * 2
    return {"result": result}
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default:"Image"

The container image used for the task execution..

timeout
float
default:"180"

The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.

workers
int
default:"1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

keep_warm_seconds
int
default:"300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

on_start
Function
default:"None"

A function that runs when the container first starts. The return values of the on_start function can be retrieved by passing a context argument to your handler function.

volumes
list
default:"None"

A list of volumes to be mounted to the container.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this endpoint, used during deployment. If not specified, you must specify the name at deploy time with the --name argument

authorized
boolean
default:"True"

If false, allows the endpoint to be invoked without an auth token.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

Serve

beam serve monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

Task Queue

Decorator for defining a task queue.

This method allows you to create a task queue out of the decorated function.

The tasks are executed asynchronously. You can interact with the task queue either through an API (when deployed), or directly in Python through the .put() method.

Task Queue
from beam import Image, task_queue


# Define the task queue
@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)

def transcribe(filename: str):
    return {}


transcribe.put("some_file.mp4")
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default:"Image"

The container image used for the task execution..

timeout
float
default:"180"

The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.

workers
int
default:"1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

keep_warm_seconds
int
default:"300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

callback_url
string
default:"None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

volumes
list
default:"None"

A list of volumes to be mounted to the container.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this endpoint, used during deployment. If not specified, you must specify the name at deploy time with the --name argument

retry_for
list
default:"None"

A list of exceptions that will trigger a retry.

Serve

beam serve monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

ASGI

Decorator used for creating and deploying an ASGI application.

from beam import asgi, Image


@asgi(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["fastapi"]),
    keep_warm_seconds=10,
    max_pending_tasks=100,
)
def web_server(context):
    from fastapi import FastAPI

    app = FastAPI()

    @app.post("/hello")
    async def something():
        return {"hello": True}

    @app.post("/warmup")
    async def warmup():
        return {"status": "warm"}

    return app
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in MiB, or as a string with units (e.g., “1Gi”).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default:"Image"

The container image used for task execution.

volumes
list
default:"None"

A list of volumes to be mounted to the container.

timeout
int
default:"3600"

The maximum number of seconds a task can run before timing out. Set to -1 to disable the timeout.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

workers
int
default:"1"

The number of processes handling tasks per container. Workers share CPU, memory, and GPU resources.

concurrent_requests
int
default:"1"

The maximum number of concurrent requests the ASGI application can handle.

keep_warm_seconds
int
default:"10"

The duration in seconds to keep the task queue warm when there are no pending tasks.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue.

secrets
list
default:"None"

A list of secrets injected into the container as environment variables.

name
string
default:"None"

An optional name for this ASGI application, used during deployment.

authorized
boolean
default:"True"

If false, allows the ASGI application to be invoked without an auth token.

autoscaler
Autoscaler
default:"QueueDepthAutoscaler()"

Configure deployment autoscaling using various strategies.

callback_url
string
default:"None"

An optional URL to send a callback when a task is completed, timed out, or canceled.

task_policy
TaskPolicy
default:"None"

The task policy for the function, overriding timeout and retries.

Serve

beam serve monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

Realtime

Decorator for creating a real-time application built on top of ASGI/websockets.
The handler function runs every time a message is received over the websocket.

from beam import realtime

def generate_text():
    return ["this", "could", "be", "anything"]

@realtime(
    cpu=1.0,
    memory=128,
    gpu="T4"
)
def handler(context):
    return generate_text()
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
string
default:"128"

The amount of memory allocated to the container. It should be specified in MiB, or as a string with units (e.g., “1Gi”).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU is required, leave it empty.

image
string
default:"Image"

The container image used for task execution.

volumes
list
default:"None"

A list of volumes to be mounted to the ASGI application.

timeout
int
default:"3600"

The maximum number of seconds a task can run before timing out. Set to -1 to disable the timeout.

workers
int
default:"1"

The number of processes handling tasks per container. Workers share CPU, memory, and GPU resources.

concurrent_requests
int
default:"1"

The maximum number of concurrent requests the ASGI application can handle. This allows processing multiple requests concurrently.

keep_warm_seconds
int
default:"10"

The duration in seconds to keep the task queue warm even if there are no pending tasks.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue.

secrets
list
default:"None"

A list of secrets injected into the container as environment variables.

name
string
default:"None"

An optional name for this ASGI application, used during deployment. If not specified, you must provide the name during deployment.

authorized
boolean
default:"True"

If false, allows the ASGI application to be invoked without an auth token.

autoscaler
Autoscaler
default:"QueueDepthAutoscaler()"

Configure a deployment autoscaler to scale the function horizontally using various autoscaling strategies.

callback_url
string
default:"None"

An optional URL to send a callback to when a task is completed, timed out, or canceled.

Serve

beam serve monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

Function

Decorator for defining a remote function.

This method allows you to run the decorated function in a remote container.

Function
from beam import Image, Function


@function(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def transcribe(filename: str):
    print(filename)
    return "some_result"


# Call a function in a remote container
function.remote("some_file.mp4")
# Map the function over several inputs
# Each of these inputs will be routed to remote containers
for result in function.map(["file1.mp4", "file2.mp4"]):
    print(result)
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in MiB, or as a string with units (e.g., “1Gi”).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. Multiple GPUs can be specified as a list.

image
string
default:"Image"

The container image used for task execution.

timeout
int
default:"3600"

The maximum number of seconds a task can run before timing out. Set to -1 to disable the timeout.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

callback_url
string
default:"None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

volumes
list
default:"None"

A list of storage volumes to be associated with the function.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this function, used during deployment. If not specified, you must specify the name at deploy time with the --name argument.

task_policy
TaskPolicy
default:"None"

The task policy for the function. This helps manage the lifecycle of an individual task. Setting values here will override timeout and retries.

retry_for
list
default:"None"

A list of exceptions that will trigger a retry.

headless
boolean
default:"False"

Determines whether the function continues running in the background after the client disconnects.

Bot

Decorator for defining a bot with multiple states and transitions.

The bot decorator allows you to define a bot with specific states (locations) and transitions. These bots run as distributed, stateful workflows, where each transition is executed in a remote container.

from beam import Bot, BotContext, BotLocation, Image
from pydantic import BaseModel


# Define input and output types for the bot
class ProductName(BaseModel):
    product_name: str


class URL(BaseModel):
    url: str


class ReviewPage(BaseModel):
    review_page: str


# Create the bot
bot = Bot(
    model="gpt-4o",
    api_key="YOUR_API_KEY",
    locations=[
        BotLocation(marker=ProductName),
        BotLocation(marker=URL, expose=False),
        BotLocation(marker=ReviewPage, expose=False),
    ],
    description="This bot retrieves product reviews and summarizes them.",
)


# Define a transition
@bot.transition(
    inputs={ProductName: 1},
    outputs=[URL],
    description="Retrieve Google Shopping URLs for a product",
    cpu=1,
    memory=128,
    image=Image(python_packages=["serpapi", "google-search-results"]),
)
def get_product_urls(context: BotContext, inputs):
    product_name = inputs[ProductName][0].product_name
    # Perform some action
    return {URL: [URL(url="https://example.com")]}
model
string
default:"None"

The underlying language model (e.g., "gpt-4o") used by the bot.

api_key
string
default:"None"

The Open API key used to authenticate requests to Open AI

locations
list
default:"[]"

A list of BotLocation objects defining the bot’s states. Each location corresponds to a type (e.g., BaseModel) that the bot operates on.

description
string
default:"None"

A human-readable description of the bot’s purpose.

authorized
bool
default:"False"

Specifies whether the bot requires an auth token passed to invoke it.

cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
int
default:"128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default:"Image"

The container image used for the task execution..

timeout
float
default:"180"

The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.

workers
int
default:"1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

keep_warm_seconds
int
default:"300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default:"100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

on_start
Function
default:"None"

A function that runs when the container first starts. The return values of the on_start function can be retrieved by passing a context argument to your handler function.

volumes
list
default:"None"

A list of volumes to be mounted to the container.

secrets
list
default:"None"

A list of secrets that are injected into the container as environment variables.

name
string
default:"None"

An optional name for this endpoint, used during deployment. If not specified, you must specify the name at deploy time with the --name argument

authorized
boolean
default:"True"

If false, allows the endpoint to be invoked without an auth token.

retries
int
default:"3"

The maximum number of times a task will be retried if the container crashes.

Autoscaling

QueueDepthAutoscaler

Adds an autoscaler to an app.

from beam import endpoint, QueueDepthAutoscaler


@endpoint(
    autoscaler=QueueDepthAutoscaler(
        min_containers=1, max_containers=3, tasks_per_container=1
    ),
)
def handler():
    return {"success": "true"}
min_containers
number
default:"0"

The number of containers to keep running at baseline. The containers will continue running until the deployment is stopped.

max_tasks_per_container
number
default:"0"

The max number of tasks that can be queued up to a single container. This can help manage throughput and cost of compute. When max_tasks_per_container is 0, a container can process any number of tasks.

max_containers
number
default:"1"

The maximum number of containers that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption.

Data Structures

Simple Queue

Creates a Queue instance.

Use this a concurrency safe distributed queue, accessible both locally and within remote containers.

Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python queue.

Because this is backed by a distributed queue, it will persist between runs.

Simple Queue
from beam import Queue

val = [1, 2, 3]

# Initialize the Queue
q = Queue(name="myqueue")

for i in range(100):
    # Insert something to the queue
    q.put(val)
while not q.empty():
    # Remove something from the queue
    val = q.pop()
    print(val)
name
string
default:"None"
required

The name of the queue (any arbitrary string).

Map

Creates a Map Instance.

Use this a concurrency safe key/value store, accessible both locally and within remote containers.

Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python dictionary.

Because this is backed by a distributed dictionary, it will persist between runs.

Map
from beam import Map

# Name the map
m = Map(name="test")

# Set a key
m["some_key"] = True

# Delete a key
del m["some_key"]

# Iterate through the map
for k, v in m.items():
    print("key: ", k)
    print("value: ", v)
name
string
default:"None"
required

The name of the map (any arbitrary string).

Storage

Beam allows you to create highly-available storage volumes that can be used across tasks. You might use volumes for things like storing model weights or large datasets.

Volume

Creates a Volume instance.

When your container runs, your volume will be available at ./{name} and /volumes/{name}.

from beam import function, Volume


VOLUME_PATH = "./model_weights"


@function(
    volumes=[Volume(name="model-weights", mount_path=VOLUME_PATH)],
)
def load_model():
    from transformers import AutoModel

    # Load model from cloud storage cache
    AutoModel.from_pretrained(VOLUME_PATH)
name
string
default:"None"
required

The name of the volume, a descriptive identifier for the data volume.

mount_path
string
default:"None"
required

The path where the volume is mounted within the container environment.

CloudBucket

Creates a CloudBucket instance.

When your container runs, your cloud bucket will be available at ./{name} and /volumes/{name}.

from beam import CloudBucket, CloudBucketConfig

# Cloud Bucket
weights = CloudBucket(
    name="weights",
    mount_path="./weights",
    config=CloudBucketConfig(
        access_key="my-access-key",
        secret_key="my-secret-key",
        endpoint="https://s3-endpoint.com",
    ),
)

@function(volumes=[weights])
def my_function():
    pass
name
string
required

The name of the cloud bucket, must be the same as the bucket name in the cloud provider.

mount_path
string
required

The path where the cloud bucket is mounted within the container environment.

config
CloudBucketConfig
required

Configuration for the cloud bucket.

CloudBucketConfig

Configuration for a cloud bucket.

from beam import CloudBucketConfig

config = CloudBucketConfig(
    read_only=False,
    access_key="my-access-key",
    secret_key="my-secret-key",
    endpoint="https://s3-endpoint.com",
    region="us-west-2"
)
read_only
boolean
default:"False"

Whether the volume is read-only.

access_key
string
default:"None"

The beam secret name for the S3 access key for the external provider.

secret_key
string
default:"None"

The beam secret name for the S3 secret key for the external provider.

endpoint
string
default:"None"

The S3 endpoint for the external provider.

region
string
default:"None"

The region for the external provider.

Output

A file that a task has created.

Use this to save a file you may want to save and share later.

from beam import Image as BeamImage, Output, function


@function(
    image=BeamImage(
        python_packages=[
            "pillow",
        ],
    ),
)
def save_image():
    from PIL import Image as PILImage

    # Generate PIL image
    pil_image = PILImage.new(
        "RGB", (100, 100), color="white"
    )  # Creating a 100x100 white image

    # Save image file
    output = Output.from_pil_image(pil_image)
    output.save()

    # Retrieve pre-signed URL for output file
    url = output.public_url(expires=400)
    print(url)

    # Print other details about the output
    print(f"Output ID: {output.id}")
    print(f"Output Path: {output.path}")
    print(f"Output Stats: {output.stat()}")
    print(f"Output Exists: {output.exists()}")

    return {"image": url}


if __name__ == "__main__":
    save_image()

When you run this function, it will return a pre-signed URL to the image:

https://app.stage.beam.cloud/output/id/abe0c95a-2cd1-40b3-bace-9225f2c79c6d
expires
int
default:"3600"
required

The length of time the pre-signed URL will be available for. The file will be automatically deleted after this period.

Files

Saving a file and generating a public URL.

myfile = "path/to/my.txt"
output = Output(path=myfile)
output.save()
output_url = output.public_url()

PIL Images

Saving a PIL.Image object.

image = pipe( ... )
output = Output.from_pil_image(image)
output.save()

Directories

Saving a directory.

mydir = Path("/volumes/myvol/mydir") # or use a str
output = Output(path=mydir)
output.save()

Experimental

Signal

Creates a Signal instance. Signals can be used to notify a container to perform specific actions using a flag.

For example, signals can reload global state, send a webhook, or terminate the container.

This is a great tool for automated retraining and deployment.
# Setting up a consumer of a signal
s = Signal(name="reload-model", handler=reload_model, clear_after_interval=5)
some_global_model = None

def load_model():
    global some_global_model
    some_global_model = LoadIt()

@endpoint(on_start=load_model)
def handler(**kwargs):
    global some_global_model
    return some_global_model(kwargs["param1"])

# Trigger load_model to execute again while the container is still running
s = Signal(name="reload-model")
s.set(ttl=60)
name
string
default:"None"

The name of the signal.

handler
Callable
default:"None"

A function to be called when the signal is set. If not provided, no handler will be executed.

clear_after_interval
int
default:"None"

The number of seconds after which the signal will be automatically cleared if both handler and clear_after_interval are set.

Integrations

vllm

A wrapper around the vLLM library that allows you to deploy it as an ASGI app.

from beam import integrations

e = integrations.VLLMArgs()
e.device = "cpu"
e.chat_template = "./chatml.jinja"

vllm_app = integrations.VLLM(name="vllm-abstraction-1", vllm_args=e)
cpu
float
default:"1.0"

The number of CPU cores allocated to the container.

memory
string
default:"128"

The amount of memory allocated to the container. It should be specified in MiB, or as a string with units (e.g., “1Gi”).

gpu
string
default:"GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU is required, leave it empty.

image
string
default:"Image"

The container image used for task execution. This will include an add_python_packages call with ["fastapi", "vllm", "huggingface_hub"] added to ensure vLLM can run.

workers
int
default:"1"

The number of workers to run in the container.

concurrent_requests
int
default:"1"

The maximum number of concurrent requests the container can handle.

keep_warm_seconds
int
default:"60"

The number of seconds to keep the container warm after the last request.

max_pending_tasks
int
default:"100"

The maximum number of pending tasks allowed in the container.

timeout
int
default:"3600"

The maximum number of seconds to wait for the container to start.

authorized
boolean
default:"True"

Whether the endpoints require authorization.

name
string
default:"None"

The name of the container. If not specified, you must provide it during deployment.

volumes
list
default:"['vllm_cache']"

The volumes to mount into the container. Default is a single volume named “vllm_cache” mounted to ”./vllm_cache”, used as the download directory for vLLM models.

secrets
list
default:"None"

A list of secrets to pass to the container. To enable Hugging Face authentication for downloading models, set the HF_TOKEN in the secrets.

autoscaler
Autoscaler
default:"QueueDepthAutoscaler()"

The autoscaler to use for scaling container deployments.

vllm_args
VLLMArgs
default:"None"

The arguments to configure the vLLM model.

Utils

env

You can use env.is_remote() to only import Python packages when your app is running remotely. This is used to avoid import errors, since your Beam app might be using Python packages that aren’t installed on your local computer.

from beam import env

if env.is_remote():
    import torch

The alternative to env.is_remote() is to import packages inline in your functions. For more information on this topic, visit this page.