diff --git a/certs/cloudflare.ini.example b/certs/cloudflare.ini.example new file mode 100644 index 0000000..2fd8c67 --- /dev/null +++ b/certs/cloudflare.ini.example @@ -0,0 +1,11 @@ +# Cloudflare API token pro certbot DNS challenge +# 1. Jdi na https://dash.cloudflare.com/profile/api-tokens +# 2. Create Token -> Edit zone DNS (template) +# 3. Zone Resources: vybrat svou doménu +# 4. Zkopírovat token sem +# +# Po vyplnění přejmenuj na cloudflare.ini a nastav práva: +# cp cloudflare.ini.example cloudflare.ini +# chmod 600 cloudflare.ini + +dns_cloudflare_api_token = VLOZ_SVUJ_CLOUDFLARE_API_TOKEN diff --git a/tests/PENTEST_CLIENT.md b/tests/PENTEST_CLIENT.md new file mode 100644 index 0000000..3cd3202 --- /dev/null +++ b/tests/PENTEST_CLIENT.md @@ -0,0 +1,79 @@ +# `tests/pentest_client.py` + +Automatizovaný pentest/integration harness nad živým serverem s reálnými účty. + +## Co test dělá + +1. **Conversation Isolation (AuthZ)** + - Účet `outsider` zkouší `get_messages`, `mark_read` a `send_message` do konverzace, kde není členem. + - Očekávání: server vrátí `error` + `"Not a member"`. + +2. **Malformed Header Rejection** + - Platný člen konverzace pošle `send_message` s obřím `ratchet_header`. + - Očekávání: server odmítne request (`Invalid ratchet_header format`), tj. funguje `_validate_header` limit. + +3. **Session Reset Authorization** + - `outsider` pošle `session_reset` na `peer_user_id`. + - Očekávání: `error` + `"No shared conversation"`. + - Pokud účty sdílenou konverzaci opravdu mají, test se označí jako `SKIP` (setup issue, ne nutně bezpečnostní chyba). + +4. **Login Rate Limits (volitelné)** + - Anonymní klient spamuje `login_start`: + - stejný email v různých kombinacích velikosti písmen (test case-normalization bucketu), + - potom rotace různých emailů ze stejné IP (test per-IP bucketu). + - Očekávání: aktivuje se jak per-email limit, tak per-IP limit. + +## Požadavky + +- Běžící server (`server.py`). +- Existující lokální klíče pro účty v `~/.encrypted_chat//` (stejné jako pro běžného CLI klienta). +- 3 různé účty: + - `member` (A), + - `peer` (B), + - `outsider` (C). + +## Spuštění + +```bash +python3 tests/pentest_client.py \ + --server-host localhost \ + --member-email alice@example.com \ + --peer-email bob@example.com \ + --outsider-email mallory@example.com +``` + +Skript si vyžádá hesla interaktivně. Lze je předat i argumenty: + +```bash +python3 tests/pentest_client.py \ + --server-host localhost \ + --member-email alice@example.com --member-password '***' \ + --peer-email bob@example.com --peer-password '***' \ + --outsider-email mallory@example.com --outsider-password '***' +``` + +Volby: + +- `--conversation-id `: použije konkrétní konverzaci místo auto member<->peer DM. +- `--skip-login-rate-limit`: přeskočí test `login_start` limiteru. +- `--server-host `: přepíše `SERVER_HOST` pro tento běh. +- `--server-port `: přepíše `SERVER_PORT` pro tento běh. + +Poznámka k TLS: + +- Pokud máš v `.env` `SERVER_HOST=0.0.0.0`, je to správně pro server bind, ale klient na to nesmí přistupovat přes TLS. +- Pro klienta použij `--server-host` s hodnotou, která je v certifikátu (SAN/CN), typicky `localhost` nebo konkrétní IP. + +## Výstup + +Skript tiskne souhrn: + +- `[PASS]` test prošel, +- `[FAIL]` test selhal (potenciální regrese), +- `[SKIP]` test nelze vyhodnotit kvůli dataset/setup podmínkám. + +Návratový kód: + +- `0` = bez failu, +- `1` = alespoň jeden fail, +- `2` = chyba vstupních parametrů. diff --git a/tests/pentest_client.py b/tests/pentest_client.py new file mode 100644 index 0000000..502153a --- /dev/null +++ b/tests/pentest_client.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +"""Security regression harness for encrypted_chat server. + +Runs focused pentest/integration checks against a live server using real accounts. +""" + +from __future__ import annotations + +import argparse +import asyncio +import getpass +import os +import ssl +import sys +import time +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +if TYPE_CHECKING: + from chat_core import ChatClient + + +@dataclass +class TestResult: + name: str + outcome: str # PASS | FAIL | SKIP + details: str + + +def _msg(resp: dict) -> str: + data = resp.get("data") or {} + return str(data.get("message", "")) + + +async def _connect_client() -> "ChatClient": + from chat_core import ChatClient # Imported lazily so --help works without full deps + client = ChatClient() + await client.connect() + client._listener_task = asyncio.create_task(client._background_listener()) + return client + + +async def _login_client(email: str, password: str) -> tuple["ChatClient", str]: + client = await _connect_client() + ok, message = await client.login(email, password) + if not ok: + await client.close() + raise RuntimeError(f"Login failed for {email}: {message}") + return client, message + + +async def _close_client(client: "ChatClient | None"): + if not client: + return + try: + await client.close() + except Exception: + pass + + +def _too_many_attempts(resp: dict) -> bool: + return resp.get("status") == "error" and "Too many attempts" in _msg(resp) + + +async def test_conversation_isolation(outsider: "ChatClient", conv_id: str) -> TestResult: + """Outsider must not access a conversation they are not a member of.""" + fake_mid = str(uuid.uuid4()) + checks: list[tuple[str, dict]] = [ + ( + "get_messages", + await outsider.send_and_recv("get_messages", conversation_id=conv_id, limit=5, offset=0), + ), + ( + "mark_read", + await outsider.send_and_recv("mark_read", conversation_id=conv_id, message_ids=[fake_mid]), + ), + ( + "send_message", + await outsider.send_and_recv("send_message", conversation_id=conv_id), + ), + ] + failures: list[str] = [] + for endpoint, resp in checks: + if resp.get("status") != "error" or "Not a member" not in _msg(resp): + failures.append(f"{endpoint} -> status={resp.get('status')} message={_msg(resp)!r}") + if failures: + return TestResult("Conversation Isolation (AuthZ)", "FAIL", "; ".join(failures)) + return TestResult( + "Conversation Isolation (AuthZ)", + "PASS", + "Outsider got 'Not a member' for get_messages, mark_read, send_message.", + ) + + +async def test_session_reset_no_shared(outsider: "ChatClient", peer_user_id: str) -> TestResult: + """session_reset must be rejected without shared conversation.""" + resp = await outsider.send_and_recv("session_reset", peer_user_id=peer_user_id) + if resp.get("status") == "error" and "No shared conversation" in _msg(resp): + return TestResult("Session Reset Authorization", "PASS", "Rejected with 'No shared conversation'.") + if resp.get("status") == "ok": + return TestResult( + "Session Reset Authorization", + "SKIP", + "Outsider appears to share a conversation with peer in current dataset.", + ) + return TestResult( + "Session Reset Authorization", + "FAIL", + f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}", + ) + + +async def test_malformed_header_rejected(member: "ChatClient", conv_id: str) -> TestResult: + """Oversized ratchet header should be rejected by server-side validation.""" + huge_header = {"dh_pub": "A" * 5000, "n": 1, "pn": 0} + resp = await member.send_and_recv( + "send_message", + conversation_id=conv_id, + ratchet_header=huge_header, + recipients=[{}], + ) + if resp.get("status") == "error" and "Invalid ratchet_header format" in _msg(resp): + return TestResult("Malformed Header Rejection", "PASS", "Oversized ratchet_header rejected.") + return TestResult( + "Malformed Header Rejection", + "FAIL", + f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}", + ) + + +async def test_login_rate_limits() -> TestResult: + """Validate login_start per-email(case-insensitive) and per-IP limits.""" + probe = await _connect_client() + try: + stamp = int(time.time()) + base_local = f"pentest-login-{stamp}" + base_domain = "example.invalid" + base_email = f"{base_local}@{base_domain}" + case_variants = [ + base_email, + f"{base_local.upper()}@{base_domain}", + f"{base_local.capitalize()}@{base_domain}", + f"{base_local}@{base_domain.upper()}", + f"{base_local.swapcase()}@{base_domain}", + base_email, + f"{base_local.upper()}@{base_domain}", + f"{base_local.capitalize()}@{base_domain}", + f"{base_local}@{base_domain.upper()}", + f"{base_local.swapcase()}@{base_domain}", + base_email, # should exceed per-email bucket (10/min) + ] + + email_bucket_triggered = False + phase1_last = "" + for e in case_variants: + resp = await probe.send_and_recv("login_start", email=e) + phase1_last = _msg(resp) + if _too_many_attempts(resp): + email_bucket_triggered = True + await asyncio.sleep(0.12) # stay under per-connection 20 req/s limiter + + ip_bucket_triggered = False + phase2_last = "" + for i in range(1, 16): + unique_email = f"{base_local}-{i}@{base_domain}" + resp = await probe.send_and_recv("login_start", email=unique_email) + phase2_last = _msg(resp) + if _too_many_attempts(resp): + ip_bucket_triggered = True + break + await asyncio.sleep(0.12) + + if email_bucket_triggered and ip_bucket_triggered: + return TestResult( + "Login Rate Limits (case + per-IP)", + "PASS", + "Per-email(case-insensitive) and per-IP login_start limits both triggered.", + ) + return TestResult( + "Login Rate Limits (case + per-IP)", + "FAIL", + ( + f"email_bucket_triggered={email_bucket_triggered}, " + f"ip_bucket_triggered={ip_bucket_triggered}, " + f"phase1_last={phase1_last!r}, phase2_last={phase2_last!r}" + ), + ) + finally: + await _close_client(probe) + + +def _pick_password(flag_value: str | None, prompt: str) -> str: + if flag_value is not None: + return flag_value + return getpass.getpass(prompt) + + +async def run(args: argparse.Namespace) -> int: + if len({args.member_email.lower(), args.peer_email.lower(), args.outsider_email.lower()}) != 3: + print("ERROR: member/peer/outsider emails must be three different accounts.", file=sys.stderr) + return 2 + + if args.server_host: + os.environ["SERVER_HOST"] = args.server_host + if args.server_port is not None: + os.environ["SERVER_PORT"] = str(args.server_port) + + effective_host = os.getenv("SERVER_HOST", "127.0.0.1").strip() + if effective_host == "0.0.0.0": + print( + "ERROR: SERVER_HOST=0.0.0.0 je bind adresa serveru, ne klientský TLS hostname.\n" + "Pouzij --server-host (napr. localhost nebo 192.168.1.112).", + file=sys.stderr, + ) + return 2 + + member_password = _pick_password(args.member_password, f"Password for {args.member_email}: ") + peer_password = _pick_password(args.peer_password, f"Password for {args.peer_email}: ") + outsider_password = _pick_password(args.outsider_password, f"Password for {args.outsider_email}: ") + + member: "ChatClient | None" = None + peer: "ChatClient | None" = None + outsider: "ChatClient | None" = None + results: list[TestResult] = [] + + try: + print("[setup] Logging in member account...") + member, _ = await _login_client(args.member_email, member_password) + print("[setup] Logging in peer account...") + peer, _ = await _login_client(args.peer_email, peer_password) + print("[setup] Logging in outsider account...") + outsider, _ = await _login_client(args.outsider_email, outsider_password) + + if args.conversation_id: + conv_id = args.conversation_id + else: + print("[setup] Finding/creating member<->peer direct conversation...") + conv_id, err = await member.find_or_create_conversation(args.peer_email) + if not conv_id: + raise RuntimeError(f"Could not find/create conversation: {err}") + + outsider_convs = {c["conversation_id"] for c in await outsider.list_conversations()} + if conv_id in outsider_convs: + results.append( + TestResult( + "Conversation Isolation (AuthZ)", + "SKIP", + "Outsider is already a member of target conversation; choose different outsider/account set.", + ) + ) + else: + results.append(await test_conversation_isolation(outsider, conv_id)) + results.append(await test_malformed_header_rejected(member, conv_id)) + + results.append(await test_session_reset_no_shared(outsider, peer.session["user_id"])) + + if not args.skip_login_rate_limit: + results.append(await test_login_rate_limits()) + + except ssl.SSLCertVerificationError as e: + results.append( + TestResult( + "Harness Setup", + "FAIL", + ( + f"{e}. Zkus --server-host s hodnotou ze SAN/CN certifikatu " + "(napr. localhost nebo 192.168.1.112)." + ), + ) + ) + except Exception as e: + emsg = str(e) + if "CERTIFICATE_VERIFY_FAILED" in emsg or "IP address mismatch" in emsg: + emsg += ( + " | Hint: pouzij --server-host s hostname/IP, ktery je v certifikatu " + "(SERVER_HOST nesmi byt 0.0.0.0)." + ) + results.append(TestResult("Harness Setup", "FAIL", emsg)) + finally: + await _close_client(member) + await _close_client(peer) + await _close_client(outsider) + + print("\n=== Pentest Results ===") + for r in results: + print(f"[{r.outcome}] {r.name}: {r.details}") + + has_fail = any(r.outcome == "FAIL" for r in results) + return 1 if has_fail else 0 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run focused pentest/integration checks against encrypted_chat server." + ) + parser.add_argument("--member-email", required=True, help="Email of regular account A (conversation member).") + parser.add_argument("--peer-email", required=True, help="Email of regular account B (other conversation member).") + parser.add_argument("--outsider-email", required=True, help="Email of regular account C (must not be in target conversation).") + parser.add_argument( + "--server-host", + default=None, + help="Override SERVER_HOST for this run (must match TLS cert SAN/CN).", + ) + parser.add_argument( + "--server-port", + type=int, + default=None, + help="Override SERVER_PORT for this run.", + ) + parser.add_argument("--member-password", default=None, help="Password for --member-email (optional; prompt if omitted).") + parser.add_argument("--peer-password", default=None, help="Password for --peer-email (optional; prompt if omitted).") + parser.add_argument("--outsider-password", default=None, help="Password for --outsider-email (optional; prompt if omitted).") + parser.add_argument( + "--conversation-id", + default=None, + help="Optional target conversation UUID. If omitted, member<->peer DM is found/created automatically.", + ) + parser.add_argument( + "--skip-login-rate-limit", + action="store_true", + help="Skip anonymous login_start rate-limit regression check.", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + return asyncio.run(run(args)) + + +if __name__ == "__main__": + raise SystemExit(main())