This section explains how you can stream the logs from your Beam deployments to a third-party service.

Connecting to the real-time API

Your Beam logs are streamed through a real-time websocket called wss://realtime.beam.cloud.

Streaming the logs

You can start streaming logs by sending the following JSON payload to the real-time API. Make sure to customize this payload with your Beam auth_token, and deployment_id. startingTimestamp is the time from which to begin streaming the logs.

{
  "action": "LOGS_ADD_STREAM",
  "token": "your_auth_token",
  "objectId": "latest_deployment_id",
  "objectType": "BEAM_DEPLOYMENT",
  "streamType": "LOGS_STREAM",
  "stream": true,
  "startingTimestamp": "time_to_begin_streaming",
  "taskId": "your_task_id"
}

Example Usage

Here’s an end-to-end example in Python which demonstrates the following:

  • Querying the /task API to retrieve a deployment ID and starting timestamp for the log stream
  • Connecting to the real-time API via websocket
  • Sending a command to the real-time API to begin streaming the deployment logs
import json
import os
import requests
import websocket
import threading
from datetime import datetime
from time import sleep

api_auth_token = os.getenv("BEAM_AUTH_TOKEN")
api_url = "https://api.beam.cloud/v1"
wss_url = "wss://realtime.beam.cloud/"

# Create a session object to persist session across requests
session = requests.Session()
session.headers.update({"Authorization": "identity " + api_auth_token})


def connect_to_server(address):
    ws = websocket.WebSocket()
    ws.connect(address)
    # Wait for the connection to be established
    while ws.sock.connected:
        if ws.sock and ws.sock.connected:
            return ws


def open_stream(task_id):
    task_data_response = session.get(f"{api_url}/task/{task_id}")
    task = task_data_response.json()

    if task["status"] == "PENDING":
        sleep(0.2)  # Sleep for 200 milliseconds before retrying
        open_stream(task_id)
        return

    latest_deployment = task["app_deployment_id"]
    ws = connect_to_server(wss_url)

    def on_message(ws, message):
        data = json.loads(message)
        hits = data.get("logs", {}).get("hits", {}).get("hits", [])

        for hit in hits:
            print(hit["_source"]["@timestamp"], hit["_source"]["msg"][:3000])

    ws.on_message = on_message

    ws.send(
        json.dumps(
            {
                "streamType": "LOGS_STREAM",
                "token": api_auth_token,
                "objectId": latest_deployment,
                "objectType": "BEAM_DEPLOYMENT",
                "startingTimestamp": datetime.now().isoformat(),
                "taskId": task_id, 
                "action": "LOGS_ADD_STREAM",
                "stream": True,
            }
        )
    )

    ws.run_forever()


# Usage example (assuming the task_id is known and valid)
# task_id = 'your_task_id_here'
# open_stream(task_id)