Sending Events Between Apps
This is an experimental feature, which means that we’re evaluating its usefulness and it may not be included in the product long-term if it isn’t useful. Please try it out and let us know what you think!
There are certain cases where you’ll want to send events between different apps running on Beam.
A common scenario is if you have a model inference and retraining pipeline, where the inference app (App #1) needs to use the latest version of a model trained in App #2.
View the Code
See the code for this example on Github.
Invoking Functions in Other Apps
This example demonstrates how to invoke functions in other apps on Beam. Specifically, we cover the scenario with an inference and a retraining function.
The retraining function needs a way to tell the inference function to use the latest model.
We use an experimental.Signal()
, which is a special type of event listener that can be triggered from the retrain function.
App 1: Retraining App
This is the retraining app. Below, we register a Signal
that will fire an event to our inference app, which is subscribed to this Signal event.
from beam import endpoint, experimental
@endpoint(name="trainer")
def train():
# Send a signal to another app letting it know that it needs to reload the models
s = experimental.Signal(name="reload-model")
s.set(ttl=60)
App 2: Inference App
Below is the inference app, which needs to reload the on_start
function when retraining is finished.
You’ll notice that a Signal is registered with a handler that tells us which function to run when an event is fired.
from beam import endpoint, Volume, experimental, Image
VOLUME_NAME = "brand_classifier"
CACHE_PATH = f"./{VOLUME_NAME}-cache"
def load_latest_model():
# Preload and return the model and tokenizer
global model, tokenizer
print("Loading latest...")
model = lambda x: x + 1 # This is just example code
s.clear() # Clear the signal so it doesn't fire again
# Set a signal handler - when invoked, it will run the handler function
s = experimental.Signal(
name="reload-model",
handler=load_latest_model,
)
@endpoint(
name="inference",
volumes=[Volume(name=VOLUME_NAME, mount_path=CACHE_PATH)],
image=Image(python_packages=["transformers", "torch"]),
on_start=load_latest_model,
)
def predict(**inputs):
global model, tokenizer # These will have the latest values
return {"success": "true"}
To test this example, you can open two terminal windows:
- In window 1, serve and invoke the inference function
- In window 2, serve and invoke the retrain function
Look at the logs in window 1 — you’ll notice that the signal has fired, and load_latest_model
ran again.
Clearing Signals
Signals will refresh every 1 second while a container is running, until signal.clear()
is called. It is recommended to run signal.clear()
after each signal invovocation.
Was this page helpful?