SDK Reference
Environment
Image
Defines a custom container image that your code will run in.
An Image object encapsulates the configuration of a custom container image that will be used as the runtime environment for executing tasks.
from beam import endpoint, Image
image = (
Image(
base_image="docker.io/nvidia/cuda:12.3.1-runtime-ubuntu20.04",
python_version="python3.9",
)
.add_commands(["apt-get update -y", "apt-get install ffmpeg -y"])
.add_python_packages(["transformers", "torch"])
)
@endpoint(image=image)
def handler():
return {}
The Python version to be used in the image. Defaults to Python 3.8.
A list of Python packages to install in the container image. Alternatively, a string containing a path to a requirements.txt can be provided. Default is [].
A list of shell commands to run when building your container image. These commands can be used for setting up the environment, installing dependencies, etc. Default is [].
A custom base image to replace the default ubuntu20.04 image used in your container. For example: docker.io/library/ubuntu:20.04 This image must contain a valid python executable that matches the version specified in python_version (i.e. python3.8, python3.9, etc) Default is None.
If using a custom base image from a private registry, credentials can be passed as either a dict with key value pairs, or a list with environment variables exported in your shell.
List of Base Image Creds
image = Image(
base_image="111111111111.dkr.ecr.us-east-1.amazonaws.com/my-app:latest",
base_image_creds=[
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
"AWS_REGION",
],
)
Dict of Base Image Creds
image = Image(
base_image="111111111111.dkr.ecr.us-east-1.amazonaws.com/my-app:latest",
base_image_creds={
"AWS_ACCESS_KEY_ID": "xxxx",
"AWS_SECRET_ACCESS_KEY": "xxxx"
"AWS_REGION": "xxxx"
},
)
Context
Context is a dataclass used to store various useful fields you might want to access in your entry point logic.
Field Name | Type | Default Value | Purpose |
---|---|---|---|
container_id | Optional[str] | None | Unique identifier for a container |
stub_id | Optional[str] | None | Identifier for a stub |
stub_type | Optional[str] | None | Type of the stub (function, endpoint, taskqueue, etc) |
callback_url | Optional[str] | None | URL called when the task status changes |
task_id | Optional[str] | None | Identifier for the specific task |
timeout | Optional[int] | None | Maximum time allowed for the task to run (seconds) |
on_start_value | Optional[Any] | None | Any values returned from the on_start function |
bind_port | int | 0 | Port number to bind a service to |
python_version | str | "" | Version of Python to be used |
Callables
Function
Decorator for defining a remote function.
This method allows you to run the decorated function in a remote container.
from beam import Image, Function
@function(
cpu=1.0,
memory=128,
gpu="T4",
image=Image(python_packages=["torch"]),
keep_warm_seconds=1000,
)
def transcribe(filename: str):
print(filename)
return "some_result"
# Call a function in a remote container
function.remote("some_file.mp4")
# Map the function over several inputs
# Each of these inputs will be routed to remote containers
for result in function.map(["file1.mp4", "file2.mp4"]):
print(result)
The number of CPU cores allocated to the container.
The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).
The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.
The container image used for the task execution..
The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.
The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.
The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.
The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
The maximum number of times a task will be retried if the container crashes.
A list of volumes to be mounted to the container.
A list of secrets that are injected into the container as environment variables.
An optional name for this endpoint, used during deployment. If not specified,
you must specify the name at deploy time with the --name
argument
Remote
You can run any function remotely on Beam by using the .remote()
method:
from beam import function
@function(cpu=8)
def square(i: int):
return i**2
if __name__ == "__main__":
# Run the `square` function remotely on Beam
result = square.remote(i=12)
print(result)
The code above is invoked by running python example.py
:
(.venv) user@MacBook % python example.py
=> Building image
=> Using cached image
=> Syncing files
=> Files synced
=> Running function: <example:square>
=> Function complete <908c76b1-ee68-4b33-ac3a-026ae646625f>
144
Map
You can scale out workloads to many containers using the .map()
method. You might use this for parallelizing computational-heavy tasks, such as batch inference or data processing jobs.
from beta9 import function
@function(cpu=8)
def square(i: int):
return i**2
def main():
numbers = list(range(10))
squared = []
# Run a remote container for every item in list
for result in square.map(numbers):
squared.append(result)
Schedule
This method allows you to schedule the decorated function to run at specific intervals defined by a cron expression.
from beam import schedule
@schedule(when="*/5 * * * *", name="scheduled-job")
def task():
print("Hi, from scheduled task!")
The cron expression or predefined schedule that determines when the task will run. This parameter defines the interval or specific time when the task should execute.
The number of CPU cores allocated to the container.
The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).
The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.
The container image used for the task execution..
The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.
The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.
The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.
The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
The maximum number of times a task will be retried if the container crashes.
A list of volumes to be mounted to the container.
A list of secrets that are injected into the container as environment variables.
An optional name for this endpoint, used during deployment. If not specified,
you must specify the name at deploy time with the --name
argument
Scheduling Options
Predefined Schedule | Description | Cron Expression |
---|---|---|
@yearly (or @annually ) | Run once a year at midnight on January 1st | 0 0 1 1 * |
@monthly | Run once a month at midnight on the first day of the month | 0 0 1 * * |
@weekly | Run once a week at midnight on Sunday | 0 0 * * 0 |
@daily (or @midnight ) | Run once a day at midnight | 0 0 * * * |
@hourly | Run once an hour at the beginning of the hour | 0 * * * * |
Endpoint
Decorator used for deploying a web endpoint.
from beam import endpoint, Image
@endpoint(
cpu=1.0,
memory=128,
gpu="T4",
image=Image(python_packages=["torch"]),
keep_warm_seconds=1000,
)
def multiply(**inputs):
result = inputs["x"] * 2
return {"result": result}
The number of CPU cores allocated to the container.
The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).
The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.
The container image used for the task execution..
The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.
The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.
The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.
The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.
A function that runs when the container first starts. The return values of the
on_start
function can be retrieved by passing a context
argument to your
handler function.
A list of volumes to be mounted to the container.
A list of secrets that are injected into the container as environment variables.
An optional name for this endpoint, used during deployment. If not specified,
you must specify the name at deploy time with the --name
argument
If false, allows the endpoint to be invoked without an auth token.
The maximum number of times a task will be retried if the container crashes.
Serve
beam serve
monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.
Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.
It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.
To start an ephemeral serve
session, you’ll use the serve
command:
beam serve app.py
By default, Beam will sync all the files in your working directory to the
remote container. This allows you to use the files you have locally while
developing. If you want to prevent some files from getting uploaded, you can
create a .beamignore
.
Task Queue
Decorator for defining a task queue.
This method allows you to create a task queue out of the decorated function.
The tasks are executed asynchronously. You can interact with the task queue either through an API (when deployed), or directly in Python through the .put()
method.
from beam import Image, task_queue
# Define the task queue
@task_queue(
cpu=1.0,
memory=128,
gpu="T4",
image=Image(python_packages=["torch"]),
keep_warm_seconds=1000,
)
def transcribe(filename: str):
return {}
transcribe.put("some_file.mp4")
The number of CPU cores allocated to the container.
The amount of memory allocated to the container. It should be specified in megabytes (e.g., 128 for 128 megabytes).
The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty.
The container image used for the task execution..
The maximum number of seconds a task can run before it times out. Default is 180. Set it to -1 to disable the timeout.
The number of concurrent tasks to handle per container. Modifying this parameter can improve throughput for certain workloads. Workers will share the CPU, Memory, and GPU defined. You may need to increase these values to increase concurrency.
The duration in seconds to keep the task queue warm even if there are no pending tasks. Keeping the queue warm helps to reduce the latency when new tasks arrive. Default is 10s.
The maximum number of tasks that can be pending in the queue. If the number of pending tasks exceeds this value, the task queue will stop accepting new tasks.
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
The maximum number of times a task will be retried if the container crashes.
A list of volumes to be mounted to the container.
A list of secrets that are injected into the container as environment variables.
An optional name for this endpoint, used during deployment. If not specified,
you must specify the name at deploy time with the --name
argument
Serve
beam serve
monitors changes in your local file system, live-reloads the remote environment as you work, and forwards remote container logs to your local shell.
Serve is great for prototyping. You can develop in a containerized cloud environment in real-time, with adjustable CPU, memory, GPU resources.
It’s also great for testing an app before deploying it. Served functions are orchestrated identically to deployments, which means you can test your Beam workflow end-to-end before deploying.
To start an ephemeral serve
session, you’ll use the serve
command:
beam serve app.py
By default, Beam will sync all the files in your working directory to the
remote container. This allows you to use the files you have locally while
developing. If you want to prevent some files from getting uploaded, you can
create a .beamignore
.
Autoscaling
QueueDepthAutoscaler
Adds an autoscaler to an app.
from beam import Image, QueueDepthAutoscaler, task_queue
@task_queue(
workers=2,
image=Image(python_version="python3.8", python_packages=["pandas", "csaps"]),
autoscaler=QueueDepthAutoscaler(max_containers=5, tasks_per_container=1),
)
def handler():
import pandas as pd
print(pd)
import time
time.sleep(5)
return {"result": True}
The max number of tasks that can be queued up to a single container. This can
help manage throughput and cost of compute. When max_tasks_per_container
is
0, a container can process any number of tasks.
The maximum number of containers that the autoscaler can create. It defines an upper limit to avoid excessive resource consumption.
Data Structures
Simple Queue
Creates a Queue instance.
Use this a concurrency safe distributed queue, accessible both locally and within remote containers.
Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python queue.
Because this is backed by a distributed queue, it will persist between runs.
from beam import Queue
val = [1, 2, 3]
# Initialize the Queue
q = Queue(name="myqueue")
for i in range(100):
# Insert something to the queue
q.put(val)
while not q.empty():
# Remove something from the queue
val = q.pop()
print(val)
The name of the queue (any arbitrary string).
Map
Creates a Map Instance.
Use this a concurrency safe key/value store, accessible both locally and within remote containers.
Serialization is done using cloudpickle, so any object that supported by that should work here. The interface is that of a standard python dictionary.
Because this is backed by a distributed dictionary, it will persist between runs.
from beam import Map
# Name the map
m = Map(name="test")
# Set a key
m["some_key"] = True
# Delete a key
del m["some_key"]
# Iterate through the map
for k, v in m.items():
print("key: ", k)
print("value: ", v)
The name of the map (any arbitrary string).
Storage
Beam allows you to create highly-available storage volumes that can be used across tasks. You might use volumes for things like storing model weights or large datasets.
Volume
Creates a Volume instance.
When your container runs, your volume will be available at ./{name}
and /volumes/{name}
.
from beam import function, Volume
VOLUME_PATH = "./model_weights"
@function(
volumes=[Volume(name="model-weights", mount_path=VOLUME_PATH)],
)
def load_model():
from transformers import AutoModel
# Load model from cloud storage cache
AutoModel.from_pretrained(VOLUME_PATH)
The name of the volume, a descriptive identifier for the data volume.
The path where the volume is mounted within the container environment.
CloudBucket
Creates a CloudBucket instance.
When your container runs, your cloud bucket will be available at ./{name}
and /volumes/{name}
.
from beam import CloudBucket, CloudBucketConfig
# Cloud Bucket
weights = CloudBucket(
name="weights",
mount_path="./weights",
config=CloudBucketConfig(
access_key="my-access-key",
secret_key="my-secret-key",
endpoint="https://s3-endpoint.com",
),
)
@function(volumes=[weights])
def my_function():
pass
The name of the cloud bucket, must be the same as the bucket name in the cloud provider.
The path where the cloud bucket is mounted within the container environment.
Configuration for the cloud bucket.
CloudBucketConfig
Configuration for a cloud bucket.
from beam import CloudBucketConfig
config = CloudBucketConfig(
read_only=False,
access_key="my-access-key",
secret_key="my-secret-key",
endpoint="https://s3-endpoint.com",
region="us-west-2"
)
Whether the volume is read-only.
The beam secret name for the S3 access key for the external provider.
The beam secret name for the S3 secret key for the external provider.
The S3 endpoint for the external provider.
The region for the external provider.
Output
A file that a task has created.
Use this to save a file you may want to save and share later.
from beam import Image as BeamImage, Output, function
@function(
image=BeamImage(
python_packages=[
"pillow",
],
),
)
def save_image():
from PIL import Image as PILImage
# Generate PIL image
pil_image = PILImage.new(
"RGB", (100, 100), color="white"
) # Creating a 100x100 white image
# Save image file
output = Output.from_pil_image(pil_image)
output.save()
# Retrieve pre-signed URL for output file
url = output.public_url(expires=400)
print(url)
# Print other details about the output
print(f"Output ID: {output.id}")
print(f"Output Path: {output.path}")
print(f"Output Stats: {output.stat()}")
print(f"Output Exists: {output.exists()}")
return {"image": url}
if __name__ == "__main__":
save_image()
When you run this function, it will return a pre-signed URL to the image:
https://app.stage.beam.cloud/output/id/abe0c95a-2cd1-40b3-bace-9225f2c79c6d
The length of time the pre-signed URL will be available for. The file will be automatically deleted after this period.
Files
Saving a file and generating a public URL.
myfile = "path/to/my.txt"
output = Output(path=myfile)
output.save()
output_url = output.public_url()
PIL Images
Saving a PIL.Image
object.
image = pipe( ... )
output = Output.from_pil_image(image)
output.save()
Directories
Saving a directory.
mydir = Path("/volumes/myvol/mydir") # or use a str
output = Output(path=mydir)
output.save()
Utils
env
You can use env.is_remote()
to only import Python packages when your app is running remotely. This is used to avoid import errors, since your Beam app might be using Python packages that aren’t installed on your local computer.
from beam import env
if env.is_remote():
import torch
The alternative to env.is_remote()
is to import packages inline in your functions. For more information on this topic, visit this page.
Was this page helpful?