from datetime import datetime, timezone import pytest from metergate import MemoryStorage, MeterGate, Period, SQLiteStorage, format_size, parse_size def stores(): return [MemoryStorage(), SQLiteStorage(":memory:")] def _seed(store): mg = MeterGate(store, period="month") mg.define_group("free", {"media": 100}).define_group("pro", {"media": 1000}) mg.define_capability("upload", ["media"]) return mg @pytest.mark.parametrize("store", stores()) def test_limit_resolution(store): mg = _seed(store) mg.set_subject("a", "free") assert mg.limit_for("a", "media") == 100 mg.set_override("a", "media", 250) assert mg.limit_for("a", "media") == 250 # override wins mg.set_override("a", "media", None) assert mg.limit_for("a", "media") == 100 # falls back to group assert mg.limit_for("unknown", "media") is None # no group = unlimited @pytest.mark.parametrize("store", stores()) def test_use_check_and_gate(store): mg = _seed(store) mg.set_subject("a", "free") mg.use("a", "media", 90) assert mg.check("a", "media").allowed assert mg.allowed("a", "upload") d = mg.use("a", "media", 20) # 110 > 100 assert d.over and d.used == 110 assert not mg.allowed("a", "upload") # capability now denied assert "a" in mg.over_quota("media") assert mg.check("a", "media").remaining == 0 @pytest.mark.parametrize("store", stores()) def test_top_and_reset(store): mg = _seed(store) mg.use("a", "media", 50) mg.use("b", "media", 80) assert mg.storage.top("media", mg.period.key(), 5)[0][0] == "b" mg.reset("b") assert mg.check("b", "media").used == 0 def test_period_reset_across_month(): mg = MeterGate(MemoryStorage(), period="month") mg.define_group("g", {"r": 10}) mg.set_subject("a", "g") jan = datetime(2026, 1, 20, tzinfo=timezone.utc) feb = datetime(2026, 2, 1, tzinfo=timezone.utc) mg.use("a", "r", 10, now=jan) assert mg.check("a", "r", now=jan).over assert mg.check("a", "r", now=feb).used == 0 # fresh allowance next month def test_period_keys(): n = datetime(2026, 6, 14, 12, 0, tzinfo=timezone.utc) assert Period("month").key(n) == "2026-06" assert Period("day").key(n) == "2026-06-14" assert Period("never").key(n) == "all" assert Period("month").reset_at(n) == datetime(2026, 7, 1, tzinfo=timezone.utc) def test_sizes(): assert parse_size("500MB") == 500 * 1024 ** 2 assert parse_size("1GiB") == 1024 ** 3 assert parse_size(2048) == 2048 assert "MiB" in format_size(parse_size("5MB")) with pytest.raises(ValueError): parse_size("12 furlongs") def test_http_api(): import json as _json import threading import urllib.request from http.server import HTTPServer from metergate.server import make_handler mg = MeterGate(MemoryStorage(), period="month") mg.define_group("free", {"media": 100}) mg.define_capability("up", ["media"]) mg.set_subject("a", "free") httpd = HTTPServer(("127.0.0.1", 0), make_handler(mg, token=None)) port = httpd.server_address[1] threading.Thread(target=httpd.serve_forever, daemon=True).start() def post(path, payload): req = urllib.request.Request( f"http://127.0.0.1:{port}{path}", data=_json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, method="POST") return _json.load(urllib.request.urlopen(req)) def get(path): return _json.load(urllib.request.urlopen(f"http://127.0.0.1:{port}{path}")) try: assert post("/v1/use", {"subject": "a", "resource": "media", "amount": 90})["used"] == 90 assert get("/v1/check?subject=a&capability=up")["allowed"] is True post("/v1/use", {"subject": "a", "resource": "media", "amount": 20}) # 110 > 100 assert get("/v1/check?subject=a&capability=up")["allowed"] is False assert "a" in get("/v1/over?resource=media")["subjects"] finally: httpd.shutdown() def test_cli(tmp_path): from metergate.cli import main db = f"sqlite:{tmp_path}/q.db" assert main(["--storage", db, "group", "set", "free", "media=100"]) == 0 assert main(["--storage", db, "cap", "set", "upload", "media"]) == 0 assert main(["--storage", db, "user", "set", "a", "--group", "free"]) == 0 main(["--storage", db, "use", "a", "media", "90"]) assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 0 # allowed main(["--storage", db, "use", "a", "media", "20"]) assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 1 # denied