Web Endpoints
Streaming Deployment Logs
This section explains how you can stream the logs from your Beam deployments to a third-party service.
Connecting to the real-time API
Your Beam logs are streamed through a real-time websocket called wss://realtime.beam.cloud
.
Streaming the logs
You can start streaming logs by sending the following JSON payload to the real-time API. Make sure to customize this payload with your Beam auth_token
, and deployment_id
. startingTimestamp
is the time from which to begin streaming the logs.
{
"action": "LOGS_ADD_STREAM",
"token": "your_auth_token",
"objectId": "latest_deployment_id",
"objectType": "BEAM_DEPLOYMENT",
"streamType": "LOGS_STREAM",
"stream": true,
"startingTimestamp": "time_to_begin_streaming",
"taskId": "your_task_id"
}
Example Usage
Here’s an end-to-end example in Python which demonstrates the following:
- Querying the
/task
API to retrieve a deployment ID and starting timestamp for the log stream - Connecting to the real-time API via websocket
- Sending a command to the real-time API to begin streaming the deployment logs
import json
import os
import requests
import websocket
import threading
from datetime import datetime
from time import sleep
api_auth_token = os.getenv("BEAM_AUTH_TOKEN")
api_url = "https://api.beam.cloud/v1"
wss_url = "wss://realtime.beam.cloud/"
# Create a session object to persist session across requests
session = requests.Session()
session.headers.update({"Authorization": "identity " + api_auth_token})
def connect_to_server(address):
ws = websocket.WebSocket()
ws.connect(address)
# Wait for the connection to be established
while ws.sock.connected:
if ws.sock and ws.sock.connected:
return ws
def open_stream(task_id):
task_data_response = session.get(f"{api_url}/task/{task_id}")
task = task_data_response.json()
if task["status"] == "PENDING":
sleep(0.2) # Sleep for 200 milliseconds before retrying
open_stream(task_id)
return
latest_deployment = task["app_deployment_id"]
ws = connect_to_server(wss_url)
def on_message(ws, message):
data = json.loads(message)
hits = data.get("logs", {}).get("hits", {}).get("hits", [])
for hit in hits:
print(hit["_source"]["@timestamp"], hit["_source"]["msg"][:3000])
ws.on_message = on_message
ws.send(
json.dumps(
{
"streamType": "LOGS_STREAM",
"token": api_auth_token,
"objectId": latest_deployment,
"objectType": "BEAM_DEPLOYMENT",
"startingTimestamp": datetime.now().isoformat(),
"taskId": task_id,
"action": "LOGS_ADD_STREAM",
"stream": True,
}
)
)
ws.run_forever()
# Usage example (assuming the task_id is known and valid)
# task_id = 'your_task_id_here'
# open_stream(task_id)
Was this page helpful?