#!/usr/bin/env python3 """Security regression harness for encrypted_chat server. Runs focused pentest/integration checks against a live server using real accounts. """ from __future__ import annotations import argparse import asyncio import getpass import os import ssl import sys import time import uuid from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) if TYPE_CHECKING: from chat_core import ChatClient @dataclass class TestResult: name: str outcome: str # PASS | FAIL | SKIP details: str def _msg(resp: dict) -> str: data = resp.get("data") or {} return str(data.get("message", "")) async def _connect_client() -> "ChatClient": from chat_core import ChatClient # Imported lazily so --help works without full deps client = ChatClient() await client.connect() client._listener_task = asyncio.create_task(client._background_listener()) return client async def _login_client(email: str, password: str) -> tuple["ChatClient", str]: client = await _connect_client() ok, message = await client.login(email, password) if not ok: await client.close() raise RuntimeError(f"Login failed for {email}: {message}") return client, message async def _close_client(client: "ChatClient | None"): if not client: return try: await client.close() except Exception: pass def _too_many_attempts(resp: dict) -> bool: return resp.get("status") == "error" and "Too many attempts" in _msg(resp) async def test_conversation_isolation(outsider: "ChatClient", conv_id: str) -> TestResult: """Outsider must not access a conversation they are not a member of.""" fake_mid = str(uuid.uuid4()) checks: list[tuple[str, dict]] = [ ( "get_messages", await outsider.send_and_recv("get_messages", conversation_id=conv_id, limit=5, offset=0), ), ( "mark_read", await outsider.send_and_recv("mark_read", conversation_id=conv_id, message_ids=[fake_mid]), ), ( "send_message", await outsider.send_and_recv("send_message", conversation_id=conv_id), ), ] failures: list[str] = [] for endpoint, resp in checks: if resp.get("status") != "error" or "Not a member" not in _msg(resp): failures.append(f"{endpoint} -> status={resp.get('status')} message={_msg(resp)!r}") if failures: return TestResult("Conversation Isolation (AuthZ)", "FAIL", "; ".join(failures)) return TestResult( "Conversation Isolation (AuthZ)", "PASS", "Outsider got 'Not a member' for get_messages, mark_read, send_message.", ) async def test_session_reset_no_shared(outsider: "ChatClient", peer_user_id: str) -> TestResult: """session_reset must be rejected without shared conversation.""" resp = await outsider.send_and_recv("session_reset", peer_user_id=peer_user_id) if resp.get("status") == "error" and "No shared conversation" in _msg(resp): return TestResult("Session Reset Authorization", "PASS", "Rejected with 'No shared conversation'.") if resp.get("status") == "ok": return TestResult( "Session Reset Authorization", "SKIP", "Outsider appears to share a conversation with peer in current dataset.", ) return TestResult( "Session Reset Authorization", "FAIL", f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}", ) async def test_malformed_header_rejected(member: "ChatClient", conv_id: str) -> TestResult: """Oversized ratchet header should be rejected by server-side validation.""" huge_header = {"dh_pub": "A" * 5000, "n": 1, "pn": 0} resp = await member.send_and_recv( "send_message", conversation_id=conv_id, ratchet_header=huge_header, recipients=[{}], ) if resp.get("status") == "error" and "Invalid ratchet_header format" in _msg(resp): return TestResult("Malformed Header Rejection", "PASS", "Oversized ratchet_header rejected.") return TestResult( "Malformed Header Rejection", "FAIL", f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}", ) async def test_login_rate_limits() -> TestResult: """Validate login_start per-email(case-insensitive) and per-IP limits.""" probe = await _connect_client() try: stamp = int(time.time()) base_local = f"pentest-login-{stamp}" base_domain = "example.invalid" base_email = f"{base_local}@{base_domain}" case_variants = [ base_email, f"{base_local.upper()}@{base_domain}", f"{base_local.capitalize()}@{base_domain}", f"{base_local}@{base_domain.upper()}", f"{base_local.swapcase()}@{base_domain}", base_email, f"{base_local.upper()}@{base_domain}", f"{base_local.capitalize()}@{base_domain}", f"{base_local}@{base_domain.upper()}", f"{base_local.swapcase()}@{base_domain}", base_email, # should exceed per-email bucket (10/min) ] email_bucket_triggered = False phase1_last = "" for e in case_variants: resp = await probe.send_and_recv("login_start", email=e) phase1_last = _msg(resp) if _too_many_attempts(resp): email_bucket_triggered = True await asyncio.sleep(0.12) # stay under per-connection 20 req/s limiter ip_bucket_triggered = False phase2_last = "" for i in range(1, 16): unique_email = f"{base_local}-{i}@{base_domain}" resp = await probe.send_and_recv("login_start", email=unique_email) phase2_last = _msg(resp) if _too_many_attempts(resp): ip_bucket_triggered = True break await asyncio.sleep(0.12) if email_bucket_triggered and ip_bucket_triggered: return TestResult( "Login Rate Limits (case + per-IP)", "PASS", "Per-email(case-insensitive) and per-IP login_start limits both triggered.", ) return TestResult( "Login Rate Limits (case + per-IP)", "FAIL", ( f"email_bucket_triggered={email_bucket_triggered}, " f"ip_bucket_triggered={ip_bucket_triggered}, " f"phase1_last={phase1_last!r}, phase2_last={phase2_last!r}" ), ) finally: await _close_client(probe) def _pick_password(flag_value: str | None, prompt: str) -> str: if flag_value is not None: return flag_value return getpass.getpass(prompt) async def run(args: argparse.Namespace) -> int: if len({args.member_email.lower(), args.peer_email.lower(), args.outsider_email.lower()}) != 3: print("ERROR: member/peer/outsider emails must be three different accounts.", file=sys.stderr) return 2 if args.server_host: os.environ["SERVER_HOST"] = args.server_host if args.server_port is not None: os.environ["SERVER_PORT"] = str(args.server_port) effective_host = os.getenv("SERVER_HOST", "127.0.0.1").strip() if effective_host == "0.0.0.0": print( "ERROR: SERVER_HOST=0.0.0.0 je bind adresa serveru, ne klientský TLS hostname.\n" "Pouzij --server-host (napr. localhost nebo 192.168.1.112).", file=sys.stderr, ) return 2 member_password = _pick_password(args.member_password, f"Password for {args.member_email}: ") peer_password = _pick_password(args.peer_password, f"Password for {args.peer_email}: ") outsider_password = _pick_password(args.outsider_password, f"Password for {args.outsider_email}: ") member: "ChatClient | None" = None peer: "ChatClient | None" = None outsider: "ChatClient | None" = None results: list[TestResult] = [] try: print("[setup] Logging in member account...") member, _ = await _login_client(args.member_email, member_password) print("[setup] Logging in peer account...") peer, _ = await _login_client(args.peer_email, peer_password) print("[setup] Logging in outsider account...") outsider, _ = await _login_client(args.outsider_email, outsider_password) if args.conversation_id: conv_id = args.conversation_id else: print("[setup] Finding/creating member<->peer direct conversation...") conv_id, err = await member.find_or_create_conversation(args.peer_email) if not conv_id: raise RuntimeError(f"Could not find/create conversation: {err}") outsider_convs = {c["conversation_id"] for c in await outsider.list_conversations()} if conv_id in outsider_convs: results.append( TestResult( "Conversation Isolation (AuthZ)", "SKIP", "Outsider is already a member of target conversation; choose different outsider/account set.", ) ) else: results.append(await test_conversation_isolation(outsider, conv_id)) results.append(await test_malformed_header_rejected(member, conv_id)) results.append(await test_session_reset_no_shared(outsider, peer.session["user_id"])) if not args.skip_login_rate_limit: results.append(await test_login_rate_limits()) except ssl.SSLCertVerificationError as e: results.append( TestResult( "Harness Setup", "FAIL", ( f"{e}. Zkus --server-host s hodnotou ze SAN/CN certifikatu " "(napr. localhost nebo 192.168.1.112)." ), ) ) except Exception as e: emsg = str(e) if "CERTIFICATE_VERIFY_FAILED" in emsg or "IP address mismatch" in emsg: emsg += ( " | Hint: pouzij --server-host s hostname/IP, ktery je v certifikatu " "(SERVER_HOST nesmi byt 0.0.0.0)." ) results.append(TestResult("Harness Setup", "FAIL", emsg)) finally: await _close_client(member) await _close_client(peer) await _close_client(outsider) print("\n=== Pentest Results ===") for r in results: print(f"[{r.outcome}] {r.name}: {r.details}") has_fail = any(r.outcome == "FAIL" for r in results) return 1 if has_fail else 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run focused pentest/integration checks against encrypted_chat server." ) parser.add_argument("--member-email", required=True, help="Email of regular account A (conversation member).") parser.add_argument("--peer-email", required=True, help="Email of regular account B (other conversation member).") parser.add_argument("--outsider-email", required=True, help="Email of regular account C (must not be in target conversation).") parser.add_argument( "--server-host", default=None, help="Override SERVER_HOST for this run (must match TLS cert SAN/CN).", ) parser.add_argument( "--server-port", type=int, default=None, help="Override SERVER_PORT for this run.", ) parser.add_argument("--member-password", default=None, help="Password for --member-email (optional; prompt if omitted).") parser.add_argument("--peer-password", default=None, help="Password for --peer-email (optional; prompt if omitted).") parser.add_argument("--outsider-password", default=None, help="Password for --outsider-email (optional; prompt if omitted).") parser.add_argument( "--conversation-id", default=None, help="Optional target conversation UUID. If omitted, member<->peer DM is found/created automatically.", ) parser.add_argument( "--skip-login-rate-limit", action="store_true", help="Skip anonymous login_start rate-limit regression check.", ) return parser.parse_args() def main() -> int: args = parse_args() return asyncio.run(run(args)) if __name__ == "__main__": raise SystemExit(main())