36 lines
1.1 KiB
Python
36 lines
1.1 KiB
Python
import os
|
|
from livekit import api
|
|
|
|
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
|
|
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
|
|
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
|
|
|
|
lkapi = api.LiveKitAPI(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
|
|
|
|
def create_access_token(identity: str, name: str, metadata: str, room_name: str) -> str:
|
|
token = (
|
|
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
|
|
.with_identity(identity)
|
|
.with_name(name)
|
|
.with_metadata(metadata)
|
|
.with_grants(
|
|
api.VideoGrants(
|
|
room_join=True,
|
|
room=room_name,
|
|
can_publish=True,
|
|
can_publish_data=True,
|
|
can_subscribe=True,
|
|
)
|
|
)
|
|
.to_jwt()
|
|
)
|
|
return token
|
|
|
|
async def create_room_if_not_exists(room_name: str):
|
|
try:
|
|
await lkapi.room.create_room(api.CreateRoomRequest(name=room_name))
|
|
except api.RoomError as e:
|
|
if "room already exists" in str(e):
|
|
pass # Room already exists, which is fine
|
|
else:
|
|
raise e |