159 lines
6.8 KiB
Python
Executable File
159 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
JWT generator for self‑hosted Jitsi (also compatible with JAAS if desired)
|
||
- HS256 (HMAC) signing using only Python standard library (no external deps).
|
||
- Flags to omit exp/nbf (test tokens), include iat, and read secret from file/STDIN.
|
||
- Flags to populate context.features: recording, livestreaming, transcription, sip-in/out.
|
||
- Robust URL construction (escapes the room name).
|
||
"""
|
||
import argparse, base64, hashlib, hmac, json, time, sys
|
||
from urllib.parse import quote
|
||
|
||
def b64url(data: bytes) -> str:
|
||
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
|
||
|
||
def sign_hs256(secret: str, signing_input: str) -> str:
|
||
sig = hmac.new(secret.encode("utf-8"), signing_input.encode("ascii"), hashlib.sha256).digest()
|
||
return b64url(sig)
|
||
|
||
def main():
|
||
p = argparse.ArgumentParser(description="JWT generator for Jitsi (HS256)")
|
||
# Identity / target
|
||
p.add_argument("--app-id", required=True, help="app_id configured in Prosody/JAAS")
|
||
p.add_argument("--secret", required=False, help="app_secret (HMAC/HS256)")
|
||
p.add_argument("--secret-file", help="Read secret from file or '-' for STDIN")
|
||
p.add_argument("--domain", help="Jitsi domain (e.g. meet.example.com) used as 'sub' in self-hosted")
|
||
p.add_argument("--room", default="*", help="Target room (or '*' for all)")
|
||
# Time
|
||
p.add_argument("--minutes", type=int, default=60, help="Validity (minutes). Ignored if --no-exp")
|
||
p.add_argument("--no-exp", action="store_true", help="Do not include 'exp' (tests only)")
|
||
p.add_argument("--nbf-offset", type=int, default=10, help="Backdating seconds for 'nbf' (default: 10)")
|
||
p.add_argument("--no-nbf", action="store_true", help="Do not include 'nbf' (tests only)")
|
||
p.add_argument("--include-iat", action="store_true", help="Include 'iat'=now")
|
||
# User
|
||
p.add_argument("--user-name", default=None, help="User display name")
|
||
p.add_argument("--user-email", default=None, help="User email")
|
||
p.add_argument("--user-id", default=None, help="User unique ID")
|
||
p.add_argument("--avatar", default=None, help="Avatar URL")
|
||
p.add_argument("--moderator", action="store_true", help="Grant moderator role via token")
|
||
p.add_argument("--moderator-as-string", action="store_true",
|
||
help="Use 'moderator': 'true'/'false' (string) instead of boolean")
|
||
# Features (self-hosted with enableFeaturesBasedOnToken)
|
||
p.add_argument("--feature-recording", action="store_true", help="Enable 'recording' in context.features")
|
||
p.add_argument("--feature-livestreaming", action="store_true", help="Enable 'livestreaming' in context.features")
|
||
p.add_argument("--feature-transcription", action="store_true", help="Enable 'transcription' in context.features")
|
||
p.add_argument("--feature-sip-in", action="store_true", help="Enable 'sip-inbound-call' in context.features")
|
||
p.add_argument("--feature-sip-out", action="store_true", help="Enable 'sip-outbound-call' in context.features")
|
||
p.add_argument("--features-all", action="store_true", help="Enable all the features above")
|
||
# Overrides / modes
|
||
p.add_argument("--aud", default=None, help="Override 'aud' (default: app_id in self-hosted)")
|
||
p.add_argument("--iss", default=None, help="Override 'iss' (default: app_id in self-hosted)")
|
||
p.add_argument("--jaas", action="store_true",
|
||
help="JAAS mode: aud='jitsi', iss='chat', sub=app_id (ignores --domain for 'sub')")
|
||
# Output
|
||
p.add_argument("--url", default=None,
|
||
help="If provided (e.g. 'https://meet.example.com/'), prints full join URL with ?jwt=")
|
||
p.add_argument("--print-json", action="store_true", help="Print payload JSON to STDERR (debug)")
|
||
|
||
args = p.parse_args()
|
||
|
||
# Secret: --secret-file takes precedence
|
||
secret = args.secret
|
||
if args.secret_file:
|
||
if args.secret_file == "-":
|
||
secret = sys.stdin.read().strip()
|
||
else:
|
||
with open(args.secret_file, "r", encoding="utf-8") as fh:
|
||
secret = fh.read().strip()
|
||
if not secret:
|
||
p.error("You must provide --secret or --secret-file (or --secret-file - for STDIN).")
|
||
|
||
now = int(time.time())
|
||
exp = None if args.no_exp else (now + args.minutes * 60)
|
||
nbf = None if args.no_nbf else (now - max(args.nbf_offset, 0))
|
||
|
||
# Header
|
||
header = {"typ": "JWT", "alg": "HS256"}
|
||
|
||
# Base claims by mode
|
||
if args.jaas:
|
||
aud = "jitsi"
|
||
iss = "chat"
|
||
sub = args.app_id
|
||
else:
|
||
if not args.domain:
|
||
p.error("--domain is required in self-hosted mode (without --jaas).")
|
||
aud = args.aud or args.app_id
|
||
iss = args.iss or args.app_id
|
||
sub = args.domain
|
||
|
||
# User / contexto
|
||
user = {}
|
||
if args.user_id: user["id"] = args.user_id
|
||
if args.user_name: user["name"] = args.user_name
|
||
if args.user_email: user["email"] = args.user_email
|
||
if args.avatar: user["avatar"] = args.avatar
|
||
if args.moderator:
|
||
if args.moderator_as_string:
|
||
user["moderator"] = "true"
|
||
else:
|
||
user["moderator"] = True
|
||
|
||
# Features
|
||
features = {}
|
||
if args.features_all:
|
||
features = {
|
||
"recording": True,
|
||
"livestreaming": True,
|
||
"transcription": True,
|
||
"sip-inbound-call": True,
|
||
"sip-outbound-call": True
|
||
}
|
||
else:
|
||
if args.feature_recording: features["recording"] = True
|
||
if args.feature_livestreaming: features["livestreaming"] = True
|
||
if args.feature_transcription: features["transcription"] = True
|
||
if args.feature_sip_in: features["sip-inbound-call"] = True
|
||
if args.feature_sip_out: features["sip-outbound-call"] = True
|
||
|
||
context = {}
|
||
if user: context["user"] = user
|
||
if features: context["features"] = features
|
||
|
||
payload = {
|
||
"aud": aud,
|
||
"iss": iss,
|
||
"sub": sub,
|
||
"room": args.room,
|
||
}
|
||
if context:
|
||
payload["context"] = context
|
||
if exp is not None:
|
||
payload["exp"] = exp
|
||
if nbf is not None:
|
||
payload["nbf"] = nbf
|
||
if args.include_iat:
|
||
payload["iat"] = now
|
||
|
||
# Build JWT manually
|
||
signing_input = f"{b64url(json.dumps(header, separators=(',', ':'), ensure_ascii=False).encode())}." \
|
||
f"{b64url(json.dumps(payload, separators=(',', ':'), ensure_ascii=False).encode())}"
|
||
signature = sign_hs256(secret, signing_input)
|
||
token = f"{signing_input}.{signature}"
|
||
|
||
if args.print_json:
|
||
print(json.dumps(payload, indent=2, ensure_ascii=False), file=sys.stderr)
|
||
|
||
if args.url:
|
||
base = args.url if args.url.endswith("/") else args.url + "/"
|
||
room_path = "" if args.room == "*" else quote(args.room, safe="")
|
||
join_url = base + room_path
|
||
sep = "&" if "?" in join_url else "?"
|
||
print(f"{join_url}{sep}jwt={token}")
|
||
else:
|
||
print(token)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|