Preserve pentest suite and cloudflare.ini.example from previous remote history

Carried over from the old Kecalek_python master (214da18).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Filip
2026-06-11 18:41:11 -04:00
parent 2e7b72307d
commit 4ded15d569
3 changed files with 428 additions and 0 deletions

79
tests/PENTEST_CLIENT.md Normal file
View File

@@ -0,0 +1,79 @@
# `tests/pentest_client.py`
Automatizovaný pentest/integration harness nad živým serverem s reálnými účty.
## Co test dělá
1. **Conversation Isolation (AuthZ)**
- Účet `outsider` zkouší `get_messages`, `mark_read` a `send_message` do konverzace, kde není členem.
- Očekávání: server vrátí `error` + `"Not a member"`.
2. **Malformed Header Rejection**
- Platný člen konverzace pošle `send_message` s obřím `ratchet_header`.
- Očekávání: server odmítne request (`Invalid ratchet_header format`), tj. funguje `_validate_header` limit.
3. **Session Reset Authorization**
- `outsider` pošle `session_reset` na `peer_user_id`.
- Očekávání: `error` + `"No shared conversation"`.
- Pokud účty sdílenou konverzaci opravdu mají, test se označí jako `SKIP` (setup issue, ne nutně bezpečnostní chyba).
4. **Login Rate Limits (volitelné)**
- Anonymní klient spamuje `login_start`:
- stejný email v různých kombinacích velikosti písmen (test case-normalization bucketu),
- potom rotace různých emailů ze stejné IP (test per-IP bucketu).
- Očekávání: aktivuje se jak per-email limit, tak per-IP limit.
## Požadavky
- Běžící server (`server.py`).
- Existující lokální klíče pro účty v `~/.encrypted_chat/<email>/` (stejné jako pro běžného CLI klienta).
- 3 různé účty:
- `member` (A),
- `peer` (B),
- `outsider` (C).
## Spuštění
```bash
python3 tests/pentest_client.py \
--server-host localhost \
--member-email alice@example.com \
--peer-email bob@example.com \
--outsider-email mallory@example.com
```
Skript si vyžádá hesla interaktivně. Lze je předat i argumenty:
```bash
python3 tests/pentest_client.py \
--server-host localhost \
--member-email alice@example.com --member-password '***' \
--peer-email bob@example.com --peer-password '***' \
--outsider-email mallory@example.com --outsider-password '***'
```
Volby:
- `--conversation-id <uuid>`: použije konkrétní konverzaci místo auto member<->peer DM.
- `--skip-login-rate-limit`: přeskočí test `login_start` limiteru.
- `--server-host <host>`: přepíše `SERVER_HOST` pro tento běh.
- `--server-port <port>`: přepíše `SERVER_PORT` pro tento běh.
Poznámka k TLS:
- Pokud máš v `.env` `SERVER_HOST=0.0.0.0`, je to správně pro server bind, ale klient na to nesmí přistupovat přes TLS.
- Pro klienta použij `--server-host` s hodnotou, která je v certifikátu (SAN/CN), typicky `localhost` nebo konkrétní IP.
## Výstup
Skript tiskne souhrn:
- `[PASS]` test prošel,
- `[FAIL]` test selhal (potenciální regrese),
- `[SKIP]` test nelze vyhodnotit kvůli dataset/setup podmínkám.
Návratový kód:
- `0` = bez failu,
- `1` = alespoň jeden fail,
- `2` = chyba vstupních parametrů.

338
tests/pentest_client.py Normal file
View File

@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""Security regression harness for encrypted_chat server.
Runs focused pentest/integration checks against a live server using real accounts.
"""
from __future__ import annotations
import argparse
import asyncio
import getpass
import os
import ssl
import sys
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
if TYPE_CHECKING:
from chat_core import ChatClient
@dataclass
class TestResult:
name: str
outcome: str # PASS | FAIL | SKIP
details: str
def _msg(resp: dict) -> str:
data = resp.get("data") or {}
return str(data.get("message", ""))
async def _connect_client() -> "ChatClient":
from chat_core import ChatClient # Imported lazily so --help works without full deps
client = ChatClient()
await client.connect()
client._listener_task = asyncio.create_task(client._background_listener())
return client
async def _login_client(email: str, password: str) -> tuple["ChatClient", str]:
client = await _connect_client()
ok, message = await client.login(email, password)
if not ok:
await client.close()
raise RuntimeError(f"Login failed for {email}: {message}")
return client, message
async def _close_client(client: "ChatClient | None"):
if not client:
return
try:
await client.close()
except Exception:
pass
def _too_many_attempts(resp: dict) -> bool:
return resp.get("status") == "error" and "Too many attempts" in _msg(resp)
async def test_conversation_isolation(outsider: "ChatClient", conv_id: str) -> TestResult:
"""Outsider must not access a conversation they are not a member of."""
fake_mid = str(uuid.uuid4())
checks: list[tuple[str, dict]] = [
(
"get_messages",
await outsider.send_and_recv("get_messages", conversation_id=conv_id, limit=5, offset=0),
),
(
"mark_read",
await outsider.send_and_recv("mark_read", conversation_id=conv_id, message_ids=[fake_mid]),
),
(
"send_message",
await outsider.send_and_recv("send_message", conversation_id=conv_id),
),
]
failures: list[str] = []
for endpoint, resp in checks:
if resp.get("status") != "error" or "Not a member" not in _msg(resp):
failures.append(f"{endpoint} -> status={resp.get('status')} message={_msg(resp)!r}")
if failures:
return TestResult("Conversation Isolation (AuthZ)", "FAIL", "; ".join(failures))
return TestResult(
"Conversation Isolation (AuthZ)",
"PASS",
"Outsider got 'Not a member' for get_messages, mark_read, send_message.",
)
async def test_session_reset_no_shared(outsider: "ChatClient", peer_user_id: str) -> TestResult:
"""session_reset must be rejected without shared conversation."""
resp = await outsider.send_and_recv("session_reset", peer_user_id=peer_user_id)
if resp.get("status") == "error" and "No shared conversation" in _msg(resp):
return TestResult("Session Reset Authorization", "PASS", "Rejected with 'No shared conversation'.")
if resp.get("status") == "ok":
return TestResult(
"Session Reset Authorization",
"SKIP",
"Outsider appears to share a conversation with peer in current dataset.",
)
return TestResult(
"Session Reset Authorization",
"FAIL",
f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}",
)
async def test_malformed_header_rejected(member: "ChatClient", conv_id: str) -> TestResult:
"""Oversized ratchet header should be rejected by server-side validation."""
huge_header = {"dh_pub": "A" * 5000, "n": 1, "pn": 0}
resp = await member.send_and_recv(
"send_message",
conversation_id=conv_id,
ratchet_header=huge_header,
recipients=[{}],
)
if resp.get("status") == "error" and "Invalid ratchet_header format" in _msg(resp):
return TestResult("Malformed Header Rejection", "PASS", "Oversized ratchet_header rejected.")
return TestResult(
"Malformed Header Rejection",
"FAIL",
f"Unexpected response: status={resp.get('status')} message={_msg(resp)!r}",
)
async def test_login_rate_limits() -> TestResult:
"""Validate login_start per-email(case-insensitive) and per-IP limits."""
probe = await _connect_client()
try:
stamp = int(time.time())
base_local = f"pentest-login-{stamp}"
base_domain = "example.invalid"
base_email = f"{base_local}@{base_domain}"
case_variants = [
base_email,
f"{base_local.upper()}@{base_domain}",
f"{base_local.capitalize()}@{base_domain}",
f"{base_local}@{base_domain.upper()}",
f"{base_local.swapcase()}@{base_domain}",
base_email,
f"{base_local.upper()}@{base_domain}",
f"{base_local.capitalize()}@{base_domain}",
f"{base_local}@{base_domain.upper()}",
f"{base_local.swapcase()}@{base_domain}",
base_email, # should exceed per-email bucket (10/min)
]
email_bucket_triggered = False
phase1_last = ""
for e in case_variants:
resp = await probe.send_and_recv("login_start", email=e)
phase1_last = _msg(resp)
if _too_many_attempts(resp):
email_bucket_triggered = True
await asyncio.sleep(0.12) # stay under per-connection 20 req/s limiter
ip_bucket_triggered = False
phase2_last = ""
for i in range(1, 16):
unique_email = f"{base_local}-{i}@{base_domain}"
resp = await probe.send_and_recv("login_start", email=unique_email)
phase2_last = _msg(resp)
if _too_many_attempts(resp):
ip_bucket_triggered = True
break
await asyncio.sleep(0.12)
if email_bucket_triggered and ip_bucket_triggered:
return TestResult(
"Login Rate Limits (case + per-IP)",
"PASS",
"Per-email(case-insensitive) and per-IP login_start limits both triggered.",
)
return TestResult(
"Login Rate Limits (case + per-IP)",
"FAIL",
(
f"email_bucket_triggered={email_bucket_triggered}, "
f"ip_bucket_triggered={ip_bucket_triggered}, "
f"phase1_last={phase1_last!r}, phase2_last={phase2_last!r}"
),
)
finally:
await _close_client(probe)
def _pick_password(flag_value: str | None, prompt: str) -> str:
if flag_value is not None:
return flag_value
return getpass.getpass(prompt)
async def run(args: argparse.Namespace) -> int:
if len({args.member_email.lower(), args.peer_email.lower(), args.outsider_email.lower()}) != 3:
print("ERROR: member/peer/outsider emails must be three different accounts.", file=sys.stderr)
return 2
if args.server_host:
os.environ["SERVER_HOST"] = args.server_host
if args.server_port is not None:
os.environ["SERVER_PORT"] = str(args.server_port)
effective_host = os.getenv("SERVER_HOST", "127.0.0.1").strip()
if effective_host == "0.0.0.0":
print(
"ERROR: SERVER_HOST=0.0.0.0 je bind adresa serveru, ne klientský TLS hostname.\n"
"Pouzij --server-host <hostname-nebo-ip-z-certifikatu> (napr. localhost nebo 192.168.1.112).",
file=sys.stderr,
)
return 2
member_password = _pick_password(args.member_password, f"Password for {args.member_email}: ")
peer_password = _pick_password(args.peer_password, f"Password for {args.peer_email}: ")
outsider_password = _pick_password(args.outsider_password, f"Password for {args.outsider_email}: ")
member: "ChatClient | None" = None
peer: "ChatClient | None" = None
outsider: "ChatClient | None" = None
results: list[TestResult] = []
try:
print("[setup] Logging in member account...")
member, _ = await _login_client(args.member_email, member_password)
print("[setup] Logging in peer account...")
peer, _ = await _login_client(args.peer_email, peer_password)
print("[setup] Logging in outsider account...")
outsider, _ = await _login_client(args.outsider_email, outsider_password)
if args.conversation_id:
conv_id = args.conversation_id
else:
print("[setup] Finding/creating member<->peer direct conversation...")
conv_id, err = await member.find_or_create_conversation(args.peer_email)
if not conv_id:
raise RuntimeError(f"Could not find/create conversation: {err}")
outsider_convs = {c["conversation_id"] for c in await outsider.list_conversations()}
if conv_id in outsider_convs:
results.append(
TestResult(
"Conversation Isolation (AuthZ)",
"SKIP",
"Outsider is already a member of target conversation; choose different outsider/account set.",
)
)
else:
results.append(await test_conversation_isolation(outsider, conv_id))
results.append(await test_malformed_header_rejected(member, conv_id))
results.append(await test_session_reset_no_shared(outsider, peer.session["user_id"]))
if not args.skip_login_rate_limit:
results.append(await test_login_rate_limits())
except ssl.SSLCertVerificationError as e:
results.append(
TestResult(
"Harness Setup",
"FAIL",
(
f"{e}. Zkus --server-host s hodnotou ze SAN/CN certifikatu "
"(napr. localhost nebo 192.168.1.112)."
),
)
)
except Exception as e:
emsg = str(e)
if "CERTIFICATE_VERIFY_FAILED" in emsg or "IP address mismatch" in emsg:
emsg += (
" | Hint: pouzij --server-host s hostname/IP, ktery je v certifikatu "
"(SERVER_HOST nesmi byt 0.0.0.0)."
)
results.append(TestResult("Harness Setup", "FAIL", emsg))
finally:
await _close_client(member)
await _close_client(peer)
await _close_client(outsider)
print("\n=== Pentest Results ===")
for r in results:
print(f"[{r.outcome}] {r.name}: {r.details}")
has_fail = any(r.outcome == "FAIL" for r in results)
return 1 if has_fail else 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run focused pentest/integration checks against encrypted_chat server."
)
parser.add_argument("--member-email", required=True, help="Email of regular account A (conversation member).")
parser.add_argument("--peer-email", required=True, help="Email of regular account B (other conversation member).")
parser.add_argument("--outsider-email", required=True, help="Email of regular account C (must not be in target conversation).")
parser.add_argument(
"--server-host",
default=None,
help="Override SERVER_HOST for this run (must match TLS cert SAN/CN).",
)
parser.add_argument(
"--server-port",
type=int,
default=None,
help="Override SERVER_PORT for this run.",
)
parser.add_argument("--member-password", default=None, help="Password for --member-email (optional; prompt if omitted).")
parser.add_argument("--peer-password", default=None, help="Password for --peer-email (optional; prompt if omitted).")
parser.add_argument("--outsider-password", default=None, help="Password for --outsider-email (optional; prompt if omitted).")
parser.add_argument(
"--conversation-id",
default=None,
help="Optional target conversation UUID. If omitted, member<->peer DM is found/created automatically.",
)
parser.add_argument(
"--skip-login-rate-limit",
action="store_true",
help="Skip anonymous login_start rate-limit regression check.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
return asyncio.run(run(args))
if __name__ == "__main__":
raise SystemExit(main())