Environment

Image

Defines a custom container image that your code will run in.

An Image object encapsulates the configuration of a custom container image that will be used as the runtime environment for executing tasks.

from beam import endpoint, Image


image = (
    Image(
        python_version="python3.9",
        python_packages=[
            "transformers",
            "torch",
        ],
        commands=["apt-get update -y && apt-get install ffmpeg -y"],
        base_image="docker.io/nvidia/cuda:12.3.1-runtime-ubuntu20.04",
    ),
)


@endpoint(image=image)
def handler():
    return {}
python_version
string
default: "3.8"

The Python version to be used in the image. Defaults to Python 3.8.

python_packages
list
default: "None"

A list of Python packages to install in the container image. Alternatively, a string containing a path to a requirements.txt can be provided. Default is [].

commands
list
default: "None"

A list of shell commands to run when building your container image. These commands can be used for setting up the environment, installing dependencies, etc. Default is [].

base_image
string
default: "None"

A custom base image to replace the default ubuntu20.04 image used in your container. For example: docker.io/library/ubuntu:20.04 This image must contain a valid python executable that matches the version specified in python_version (i.e. python3.8, python3.9, etc) Default is None.

Callables

Function

Decorator for defining a remote function.

This method allows you to run the decorated function in a remote container.

Function
from beam import Image, Function


@function(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def transcribe(filename: str):
    print(filename)
    return "some_result"


# Call a function in a remote container
function.remote("some_file.mp4")
# Map the function over several inputs
# Each of these inputs will be routed to remote containers
for result in function.map(["file1.mp4", "file2.mp4"]):
    print(result)
cpu
float
default: "1.0"

The number of CPU cores allocated to the container.

memory
int
default: "128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default: "GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default: "Image"

The container image used for the task execution..

timeout
float
default: "180"

The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout.

concurrency
int
default: "1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

max_containers
int
default: "1"

The maximum number of containers that the task queue will autoscale to. If the number of tasks in the queue goes over the concurrency value, the task queue will automatically add containers to handle the load.

keep_warm_seconds
int
default: "300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default: "100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

callback_url
string
default: "None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

retries
int
default: "3"

The maximum number of times a task will be retried if the container crashes.

volumes
list
default: "None"

A list of volumes to be mounted to the container.

Remote

You can run any function remotely on Beam by using the .remote() method:

from beam import function


@function(cpu=8)
def square(i: int):
    return i**2


if __name__ == "__main__":
    # Run the `square` function remotely on Beam
    result = square.remote(i=12)
    print(result)

The code above is invoked by running python example.py:

(.venv) user@MacBook % python example.py
=> Building image
=> Using cached image
=> Syncing files
=> Files synced
=> Running function: <example:square>
=> Function complete <908c76b1-ee68-4b33-ac3a-026ae646625f>
144

Map

You can scale out workloads to many containers using the .map() method. You might use this for parallelizing computational-heavy tasks, such as batch inference or data processing jobs.

from beta9 import function


@function(cpu=8)
def square(i: int):
    return i**2


def main():
    numbers = list(range(10))
    squared = []

    # Run a remote container for every item in list
    for result in square.map(numbers):
        squared.append(result)

Endpoint

Decorator used for deploying a web endpoint.

from beam import endpoint, Image


@endpoint(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)
def multiply(**inputs):
    result = inputs["x"] * 2
    return {"result": result}
cpu
float
default: "1.0"

The number of CPU cores allocated to the container.

memory
int
default: "128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default: "GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default: "Image"

The container image used for the task execution..

timeout
float
default: "180"

The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout.

concurrency
int
default: "1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

max_containers
int
default: "1"

The maximum number of containers that the task queue will autoscale to. If the number of tasks in the queue goes over the concurrency value, the task queue will automatically add containers to handle the load.

keep_warm_seconds
int
default: "300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default: "100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

on_start
Function
default: "None"

A function that runs when the container first starts. The return values of the on_start function can be retrieved by passing a context argument to your handler function.

volumes
list
default: "None"

A list of volumes to be mounted to the container.

Serve

beam serve functions like an Ngrok server that live-reloads as you work. Beam monitors changes in your local file system and forwards the remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

Task Queue

Decorator for defining a task queue.

This method allows you to create a task queue out of the decorated function.

The tasks are executed asynchronously. You can interact with the task queue either through an API (when deployed), or directly in Python through the .put() method.

Task Queue
from beam import Image, task_queue


# Define the task queue
@task_queue(
    cpu=1.0,
    memory=128,
    gpu="T4",
    image=Image(python_packages=["torch"]),
    keep_warm_seconds=1000,
)

def transcribe(filename: str):
    return {}


transcribe.put("some_file.mp4")
cpu
float
default: "1.0"

The number of CPU cores allocated to the container.

memory
int
default: "128"

The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).

gpu
string
default: "GpuType.NoGPU"

The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.

image
string
default: "Image"

The container image used for the task execution..

timeout
float
default: "180"

The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout.

concurrency
int
default: "1"

The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.

max_containers
int
default: "1"

The maximum number of containers that the task queue will autoscale to. If the number of tasks in the queue goes over the concurrency value, the task queue will automatically add containers to handle the load.

keep_warm_seconds
int
default: "300"

The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.

max_pending_tasks
int
default: "100"

The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.

callback_url
string
default: "None"

An optional URL to send a callback to when a task is completed, timed out, or cancelled.

retries
int
default: "3"

The maximum number of times a task will be retried if the container crashes.

volumes
list
default: "None"

A list of volumes to be mounted to the container.

Serve

beam serve functions like an Ngrok server that live-reloads as you work. Beam monitors changes in your local file system and forwards the remote container logs to your local shell.

Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.

It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.

To start an ephemeral serve session, you’ll use the serve command:

beam serve app.py
Sessions end automatically after 10 minutes of inactivity.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

Data Structures

Simple Queue

Creates a Queue instance.

Use this a concurrency safe distributed queue, accessible both locally and within remote containers.

Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python queue.

Because this is backed by a distributed queue, it will persist between runs.

Simple Queue
from beam import Queue

val = [1, 2, 3]

# Initialize the Queue
q = Queue(name="myqueue")

for i in range(100):
    # Insert something to the queue
    q.put(val)
while not q.empty():
    # Remove something from the queue
    val = q.pop()
    print(val)
name
string
default: "None"required

The name of the queue (any arbitrary string).

Map

Creates a Map Instance.

Use this a concurrency safe key/value store, accessible both locally and within remote containers.

Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python dictionary.

Because this is backed by a distributed dictionary, it will persist between runs.

Map
from beam import Map

# Name the map
m = Map(name="test")

# Set a key
m["some_key"] = True

# Delete a key
del m["some_key"]

# Iterate through the map
for k, v in m.items():
    print("key: ", k)
    print("value: ", v)
name
string
default: "None"required

The name of the map (any arbitrary string).

Storage

Volume

Creates a Volume instance.

When your container runs, your volume will be available at ./{name} and /volumes/{name}.

from beam import function, Volume


VOLUME_PATH = "./model_weights"


@function(
    volumes=[Volume(name="model-weights", mount_path=VOLUME_PATH)],
)
def load_model():
    from transformers import AutoModel

    # Load model from cloud storage cache
    AutoModel.from_pretrained(VOLUME_PATH)
name
string
default: "None"required

The name of the volume, a descriptive identifier for the data volume.

mount_path
string
default: "None"required

The path where the volume is mounted within the container environment.