If you supply a callback_url argument to your function decorator, Beam will make a POST request to your server whenever a task finishes running. Callbacks fire for both successful and failed tasks.

Callbacks include the Beam Task ID in the request headers, and the task response URL-encoded in the request body.

For testing purposes, you can setup a temporary webhook URL using https://webhook.site

Registering a Callback URL

Callbacks can be added onto endpoints, functions, and task queues:

from beam import function


@function(callback_url="https://your-server.io")
def handler(x):
    return {"result": x}

if __name__ == "__main__":
    handler.remote(x=10)

Callback format

Data Payload

The callback will send the response from your function as JSON, in the data field:

{
  "data": {
    "result": 10
  }
}

Request headers

The request headers include the following fields:

  • x-task-timestamp — timestamp the task was created.
  • x-task-signature — signature to verify that the request was sent from Beam.
  • x-task-status — status of the task.
  • x-task-id — the task ID.

Request Level Callbacks

There are cases where you might want to define a different callback_url for each request, for example if you have different environments for staging and prod.

You can pass callback_url as a payload to anything you’re running on Beam, and we’ll use that as the callback for the request:

curl -X POST \
  --compressed 'https://multiply-712408b-v1.app.beam.cloud' \
  -H 'Authorization: [YOUR_AUTH_TOKEN]' \
  -H 'Content-Type: application/json' \
  -d '{"callback_url": "https://webhook.site/341d3777-cdd0-4c7e-82cb-dcc06ea4f774"}'

When using request-level callbacks, you must include either the callback_url value or kwargs (**inputs) as input to the handler function:

from beam import endpoint


@endpoint()
def handler(callback_url): # Make sure to pass this value!
    return {"response": "true"}

@endpoint()
def handler(**inputs): # You can use kwargs too
    return {"response": "true"}

Verifying Requests

Timestamp Verification

To secure your server against replay attacks, a timestamp and signature are included in the callback request headers.

As a best-practice, it is wise to check the timestamp header of each callback request. If the timestamp is over 5s old, there is a risk that the callback was not fired from Beam.

Signature Verification

The most secure way of verifying a callback request is through signature verification.

Your Signature Token can be found in the dashboard, on the Settings -> General page.

Validating a Signature

The callback request will include a header field called x-task-signature.

x-task-signature is a unique signature generated by converting the request body to base64, concatenating it with the timestamp, and signing it with your Beam API token.

The code below shows how to validate a callback signature:

import base64
import hashlib
import hmac


def verify_signature(
    request_body: bytes, secret_key: str, timestamp: int, signature: str
):
    # Encode request body to Base64
    base64_payload = base64.b64encode(request_body).decode()

    # Create data to sign by concatenating base64 payload with timestamp
    data_to_sign = f"{base64_payload}:{timestamp}"

    # Initialize HMAC with SHA256 and secret key
    h = hmac.new(secret_key.encode(), data_to_sign.encode(), hashlib.sha256)

    # Compute the HMAC signature
    computed_signature = h.hexdigest()
    assert signature == computed_signature