1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | from datetime import datetime, timezone
import pytest
from metergate import MemoryStorage, MeterGate, Period, SQLiteStorage, format_size, parse_size
def stores():
return [MemoryStorage(), SQLiteStorage(":memory:")]
def _seed(store):
mg = MeterGate(store, period="month")
mg.define_group("free", {"media": 100}).define_group("pro", {"media": 1000})
mg.define_capability("upload", ["media"])
return mg
@pytest.mark.parametrize("store", stores())
def test_limit_resolution(store):
mg = _seed(store)
mg.set_subject("a", "free")
assert mg.limit_for("a", "media") == 100
mg.set_override("a", "media", 250)
assert mg.limit_for("a", "media") == 250 # override wins
mg.set_override("a", "media", None)
assert mg.limit_for("a", "media") == 100 # falls back to group
assert mg.limit_for("unknown", "media") is None # no group = unlimited
@pytest.mark.parametrize("store", stores())
def test_use_check_and_gate(store):
mg = _seed(store)
mg.set_subject("a", "free")
mg.use("a", "media", 90)
assert mg.check("a", "media").allowed
assert mg.allowed("a", "upload")
d = mg.use("a", "media", 20) # 110 > 100
assert d.over and d.used == 110
assert not mg.allowed("a", "upload") # capability now denied
assert "a" in mg.over_quota("media")
assert mg.check("a", "media").remaining == 0
@pytest.mark.parametrize("store", stores())
def test_top_and_reset(store):
mg = _seed(store)
mg.use("a", "media", 50)
mg.use("b", "media", 80)
assert mg.storage.top("media", mg.period.key(), 5)[0][0] == "b"
mg.reset("b")
assert mg.check("b", "media").used == 0
def test_period_reset_across_month():
mg = MeterGate(MemoryStorage(), period="month")
mg.define_group("g", {"r": 10})
mg.set_subject("a", "g")
jan = datetime(2026, 1, 20, tzinfo=timezone.utc)
feb = datetime(2026, 2, 1, tzinfo=timezone.utc)
mg.use("a", "r", 10, now=jan)
assert mg.check("a", "r", now=jan).over
assert mg.check("a", "r", now=feb).used == 0 # fresh allowance next month
def test_period_keys():
n = datetime(2026, 6, 14, 12, 0, tzinfo=timezone.utc)
assert Period("month").key(n) == "2026-06"
assert Period("day").key(n) == "2026-06-14"
assert Period("never").key(n) == "all"
assert Period("month").reset_at(n) == datetime(2026, 7, 1, tzinfo=timezone.utc)
def test_sizes():
assert parse_size("500MB") == 500 * 1024 ** 2
assert parse_size("1GiB") == 1024 ** 3
assert parse_size(2048) == 2048
assert "MiB" in format_size(parse_size("5MB"))
with pytest.raises(ValueError):
parse_size("12 furlongs")
def test_http_api():
import json as _json
import threading
import urllib.request
from http.server import HTTPServer
from metergate.server import make_handler
mg = MeterGate(MemoryStorage(), period="month")
mg.define_group("free", {"media": 100})
mg.define_capability("up", ["media"])
mg.set_subject("a", "free")
httpd = HTTPServer(("127.0.0.1", 0), make_handler(mg, token=None))
port = httpd.server_address[1]
threading.Thread(target=httpd.serve_forever, daemon=True).start()
def post(path, payload):
req = urllib.request.Request(
f"http://127.0.0.1:{port}{path}", data=_json.dumps(payload).encode(),
headers={"Content-Type": "application/json"}, method="POST")
return _json.load(urllib.request.urlopen(req))
def get(path):
return _json.load(urllib.request.urlopen(f"http://127.0.0.1:{port}{path}"))
try:
assert post("/v1/use", {"subject": "a", "resource": "media", "amount": 90})["used"] == 90
assert get("/v1/check?subject=a&capability=up")["allowed"] is True
post("/v1/use", {"subject": "a", "resource": "media", "amount": 20}) # 110 > 100
assert get("/v1/check?subject=a&capability=up")["allowed"] is False
assert "a" in get("/v1/over?resource=media")["subjects"]
finally:
httpd.shutdown()
def test_cli(tmp_path):
from metergate.cli import main
db = f"sqlite:{tmp_path}/q.db"
assert main(["--storage", db, "group", "set", "free", "media=100"]) == 0
assert main(["--storage", db, "cap", "set", "upload", "media"]) == 0
assert main(["--storage", db, "user", "set", "a", "--group", "free"]) == 0
main(["--storage", db, "use", "a", "media", "90"])
assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 0 # allowed
main(["--storage", db, "use", "a", "media", "20"])
assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 1 # denied
|