egor/metergate — commit
metergate 0.1.0 — tiered usage quotas, capability gating, CLI + HTTP API
.gitignoreLICENSELICENSES/0BSD.txtLICENSES/BSD-2-Clause.txtLICENSES/BSD-3-Clause.txtLICENSES/ISC.txtLICENSES/MIT.txtLICENSES/README.mdLICENSES/Unlicense.txtLICENSES/WTFPL.txtREADME.mdpyproject.tomlsrc/metergate/__init__.pysrc/metergate/cli.pysrc/metergate/errors.pysrc/metergate/manager.pysrc/metergate/models.pysrc/metergate/period.pysrc/metergate/server.pysrc/metergate/sizes.pysrc/metergate/storage/__init__.pysrc/metergate/storage/base.pysrc/metergate/storage/memory.pysrc/metergate/storage/postgres.pysrc/metergate/storage/redis.pysrc/metergate/storage/sqlite.pytests/test_metergate.py
commit 67e3b340da3664ad1f0c97833f08dda7d41d8357
Author: egor <egor@noctentrion.com>
Date: Sun Jun 14 20:19:02 2026 +0200
metergate 0.1.0 — tiered usage quotas, capability gating, CLI + HTTP API
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..aa1109d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+__pycache__/
+*.py[cod]
+*.egg-info/
+.pytest_cache/
+dist/
+build/
+.venv/
+*.db
+*.sqlite3
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5391517
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2026 Egor Ivanov
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/LICENSES/0BSD.txt b/LICENSES/0BSD.txt
new file mode 100644
index 0000000..7b66491
--- /dev/null
+++ b/LICENSES/0BSD.txt
@@ -0,0 +1,14 @@
+BSD Zero Clause License
+
+Copyright (C) <year> by <author>
+
+Permission to use, copy, modify, and/or distribute this software for any
+purpose with or without fee is hereby granted.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
diff --git a/LICENSES/BSD-2-Clause.txt b/LICENSES/BSD-2-Clause.txt
new file mode 100644
index 0000000..f70f8cb
--- /dev/null
+++ b/LICENSES/BSD-2-Clause.txt
@@ -0,0 +1,24 @@
+BSD 2-Clause License
+
+Copyright (c) <year>, <owner>
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/LICENSES/BSD-3-Clause.txt b/LICENSES/BSD-3-Clause.txt
new file mode 100644
index 0000000..e3bc45d
--- /dev/null
+++ b/LICENSES/BSD-3-Clause.txt
@@ -0,0 +1,28 @@
+BSD 3-Clause License
+
+Copyright (c) <year>, <owner>
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/LICENSES/ISC.txt b/LICENSES/ISC.txt
new file mode 100644
index 0000000..ee93fcb
--- /dev/null
+++ b/LICENSES/ISC.txt
@@ -0,0 +1,15 @@
+ISC License
+
+Copyright (c) <year>, <copyright holder>
+
+Permission to use, copy, modify, and/or distribute this software for any
+purpose with or without fee is hereby granted, provided that the above
+copyright notice and this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
diff --git a/LICENSES/MIT.txt b/LICENSES/MIT.txt
new file mode 100644
index 0000000..242da62
--- /dev/null
+++ b/LICENSES/MIT.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) <year> <copyright holders>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/LICENSES/README.md b/LICENSES/README.md
new file mode 100644
index 0000000..f1cc628
--- /dev/null
+++ b/LICENSES/README.md
@@ -0,0 +1,37 @@
+# LICENSES
+
+metergate itself is MIT (see [`../LICENSE`](../LICENSE)). This folder is a small
+archive of the licenses you're most likely to actually reach for — kept here so
+you don't go fishing across the internet the next time you start a project at 2am.
+
+## In here, in full
+
+- **MIT** — the default, the one everyone grabs first.
+- **BSD-2-Clause**, **BSD-3-Clause** — MIT with extra paperwork.
+- **ISC** — MIT after a diet.
+- **0BSD** — like ISC, minus even the attribution requirement.
+- **Unlicense** — for when you want to drop it into the public domain and walk away.
+- **WTFPL** — *the awkward one.* It is a real, used-in-anger license whose entire
+ operative clause is "You just DO WHAT THE FUCK YOU WANT TO." Legally it mostly
+ holds up; in a code review it will start a conversation. Use responsibly — or,
+ per its own terms, don't.
+
+## Not in here, on purpose
+
+The heavyweights — **Apache-2.0**, **MPL-2.0**, **LGPL-3.0**, **GPL-3.0**,
+**AGPL-3.0** — are each many kilobytes of load-bearing legalese where one
+transposed word genuinely matters. Retyping those from memory is how you end up
+shipping a license that says the opposite of what you meant. Grab the canonical
+text by SPDX id instead:
+
+| SPDX | canonical text |
+|------|----------------|
+| Apache-2.0 | https://spdx.org/licenses/Apache-2.0.html |
+| MPL-2.0 | https://spdx.org/licenses/MPL-2.0.html |
+| LGPL-3.0-or-later | https://spdx.org/licenses/LGPL-3.0-or-later.html |
+| GPL-3.0-or-later | https://spdx.org/licenses/GPL-3.0-or-later.html |
+| AGPL-3.0-or-later | https://spdx.org/licenses/AGPL-3.0-or-later.html |
+
+Honorable mention to the *JSON License* ("The Software shall be used for Good,
+not Evil"), which is so awkward that real companies have had to formally request
+permission to use it for evil. We did not include it. We were tempted.
diff --git a/LICENSES/Unlicense.txt b/LICENSES/Unlicense.txt
new file mode 100644
index 0000000..edc46f3
--- /dev/null
+++ b/LICENSES/Unlicense.txt
@@ -0,0 +1,21 @@
+This is free and unencumbered software released into the public domain.
+
+Anyone is free to copy, modify, publish, use, compile, sell, or distribute this
+software, either in source code form or as a compiled binary, for any purpose,
+commercial or non-commercial, and by any means.
+
+In jurisdictions that recognize copyright laws, the author or authors of this
+software dedicate any and all copyright interest in the software to the public
+domain. We make this dedication for the benefit of the public at large and to
+the detriment of our heirs and successors. We intend this dedication to be an
+overt act of relinquishment in perpetuity of all present and future rights to
+this software under copyright law.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+For more information, please refer to <https://unlicense.org>
diff --git a/LICENSES/WTFPL.txt b/LICENSES/WTFPL.txt
new file mode 100644
index 0000000..5c93f45
--- /dev/null
+++ b/LICENSES/WTFPL.txt
@@ -0,0 +1,13 @@
+ DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
+ Version 2, December 2004
+
+ Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
+
+ Everyone is permitted to copy and distribute verbatim or modified
+ copies of this license document, and changing it is allowed as long
+ as the name is changed.
+
+ DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. You just DO WHAT THE FUCK YOU WANT TO.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f5e3b83
--- /dev/null
+++ b/README.md
@@ -0,0 +1,114 @@
+# metergate
+
+Per-subject usage quotas with **groups**, **per-user overrides**, an automatic
+**monthly reset**, and **capability gating** — i.e. *"you've spent your media
+allowance for the month, so uploads are off, but you can still type."*
+Standalone, zero required dependencies, and you can drive it from **Python, a
+CLI, or an HTTP API**.
+
+*Initially developed for Noctentrion (a continuwuity-based Matrix homeserver).*
+
+## Why this exists
+
+Synapse enforces per-user rate limits and per-user media quotas natively.
+conduwuit — and continuwuity, the fork it became — does not. It is a lightweight,
+RocksDB-backed homeserver: it has a global media size cap and admin-side media
+pruning, but no per-user rate-limit or media-quota subsystem, and no admin
+surface to configure one. That gap has held across conduwuit's lifetime and
+through continuwuity's current 0.5.x line (0.5.9 as of writing).
+
+So the limit has to live outside the homeserver. metergate is that: a small,
+generic quota engine you point at whatever per-user usage signal you already
+collect — here, upload bytes — and let it decide what is still allowed.
+
+> Packaging, tests, the HTTP API, and final refinement done with Claude.
+
+## Install
+
+```
+pip install metergate # core: zero dependencies
+pip install "metergate[postgres]" # or [redis]
+```
+
+## Concepts
+
+- **resource** — a quantity you count: `media_bytes`, `requests`, `messages`.
+- **group** — a tier with per-resource limits, e.g. `free` or `patron`.
+- **override** — a per-user limit that takes precedence over the group.
+- **period** — when the counter resets: `month` (default), `week`, `day`, or
+ `never`. Usage is bucketed by period, so the reset is automatic.
+- **capability** — an action gated by one or more resources. If any of them is
+ over its limit, the capability is denied and nothing else is affected.
+
+Limit resolution is override → group → unlimited.
+
+## Use it from Python
+
+```python
+from metergate import MeterGate, SQLiteStorage
+
+mg = MeterGate(SQLiteStorage("quota.db"), period="month")
+
+mg.define_group("free", {"media_bytes": "500MB"})
+mg.define_group("patron", {"media_bytes": "50GB"})
+mg.define_capability("upload_media", ["media_bytes"]) # gated by media_bytes
+
+mg.set_subject("@alice", group="free")
+mg.set_override("@alice", "media_bytes", "1GB") # @alice gets a bump
+
+mg.use("@alice", "media_bytes", "300MB") # record an upload
+mg.allowed("@alice", "upload_media") # True — still under 1GB
+mg.check("@alice", "media_bytes").remaining # bytes left this month
+mg.over_quota("media_bytes") # everyone who's blown it
+```
+
+## …or the CLI
+
+```
+metergate group set free media_bytes=500MB requests=10000
+metergate cap set upload_media media_bytes
+metergate user set @alice --group free
+metergate use @alice media_bytes 300MB
+metergate check @alice --cap upload_media # prints ALLOWED/DENIED, exits 0/1
+metergate over media_bytes # who's over this month
+metergate top media_bytes -n 10 # biggest users
+metergate export > quota.json # config out; `import` puts it back
+```
+
+Pick the backend with `--storage` (or `$METERGATE_STORAGE`):
+`memory` · `sqlite:quota.db` · `postgresql://…` · `redis://…`.
+
+## …or over HTTP, for everything that isn't Python
+
+```
+metergate serve --port 8400 --token s3cret
+```
+
+```
+curl -H 'Authorization: Bearer s3cret' \
+ 'http://localhost:8400/v1/check?subject=@alice&capability=upload_media'
+# {"subject":"@alice","capability":"upload_media","allowed":true}
+
+curl -H 'Authorization: Bearer s3cret' -X POST localhost:8400/v1/use \
+ -d '{"subject":"@alice","resource":"media_bytes","amount":"300MB"}'
+```
+
+Full surface: `GET /v1/check` · `GET /v1/usage` · `GET /v1/over` · `GET /v1/top`
+· `GET|POST /v1/groups` · `POST /v1/subjects` · `POST /v1/use` · `GET /healthz`.
+JSON in, JSON out, optional bearer token — so any platform that can hold a
+conversation over HTTP can integrate it. (That was the point of the question,
+yes — it has a real API now.)
+
+## The continuwuity use-case, concretely
+
+```python
+# in your media-upload path:
+if not mg.allowed(user_id, "upload_media"):
+ return forbidden("monthly media quota reached — text still works")
+mg.use(user_id, "media_bytes", len(blob))
+```
+
+## License
+
+MIT — see [`LICENSE`](LICENSE). There is also a [`LICENSES/`](LICENSES/) archive,
+explained (with a mostly straight face) in [`LICENSES/README.md`](LICENSES/README.md).
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..be73c80
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,39 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "metergate"
+version = "0.1.0"
+description = "Tiered usage quotas + capability gating: groups, per-user overrides, calendar-period reset."
+readme = "README.md"
+requires-python = ">=3.10"
+license = "MIT"
+authors = [{ name = "Egor Ivanov" }]
+keywords = ["quota", "rate-limit", "usage", "metering", "capability", "tiers", "groups"]
+classifiers = [
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python :: 3 :: Only",
+ "Topic :: System :: Systems Administration",
+]
+dependencies = []
+
+[project.optional-dependencies]
+postgres = ["psycopg[binary]>=3.1"]
+redis = ["redis>=5.0"]
+dev = ["pytest>=8"]
+
+[project.scripts]
+metergate = "metergate.cli:main"
+
+[project.urls]
+Homepage = "https://git.noctentrion.com/egor/metergate"
+Source = "https://git.noctentrion.com/egor/metergate"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/metergate"]
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
diff --git a/src/metergate/__init__.py b/src/metergate/__init__.py
new file mode 100644
index 0000000..b58b1f0
--- /dev/null
+++ b/src/metergate/__init__.py
@@ -0,0 +1,25 @@
+from .errors import MeterGateError, StorageError, UnknownCapability, UnknownGroup
+from .manager import MeterGate
+from .models import Decision
+from .period import Period
+from .sizes import format_size, parse_size
+from .storage import MemoryStorage, SQLiteStorage, Storage, storage_from_url
+
+__version__ = "0.1.0"
+
+__all__ = [
+ "MeterGate",
+ "Period",
+ "Decision",
+ "Storage",
+ "MemoryStorage",
+ "SQLiteStorage",
+ "storage_from_url",
+ "parse_size",
+ "format_size",
+ "MeterGateError",
+ "UnknownGroup",
+ "UnknownCapability",
+ "StorageError",
+ "__version__",
+]
diff --git a/src/metergate/cli.py b/src/metergate/cli.py
new file mode 100644
index 0000000..ebaa093
--- /dev/null
+++ b/src/metergate/cli.py
@@ -0,0 +1,265 @@
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import sys
+
+from . import __version__
+from .errors import MeterGateError
+from .manager import MeterGate
+from .sizes import format_size, parse_size
+from .storage import storage_from_url
+
+
+def _fmt(resource: str, n: int | None) -> str:
+ if n is None:
+ return "unlimited"
+ return f"{n} ({format_size(n)})" if "byte" in resource.lower() else str(n)
+
+
+def _pairs(items: list[str] | None) -> dict[str, int]:
+ out: dict[str, int] = {}
+ for item in items or []:
+ if "=" not in item:
+ raise SystemExit(f"expected resource=amount, got {item!r}")
+ key, value = item.split("=", 1)
+ out[key.strip()] = parse_size(value.strip())
+ return out
+
+
+def _mg(args) -> MeterGate:
+ return MeterGate(storage_from_url(args.storage), period=args.period)
+
+
+def cmd_group_set(mg: MeterGate, a) -> int:
+ for resource, value in _pairs(a.limits).items():
+ mg.set_group_limit(a.name, resource, value)
+ return cmd_group_list(mg, a)
+
+
+def cmd_group_list(mg: MeterGate, a) -> int:
+ data = {g: mg.storage.get_group_limits(g) for g in mg.storage.list_groups()}
+ if getattr(a, "json", False):
+ print(json.dumps(data))
+ return 0
+ if not data:
+ print("(no groups)")
+ return 0
+ for group, limits in data.items():
+ print(group)
+ for resource, value in sorted(limits.items()):
+ print(f" {resource:<22} {_fmt(resource, value)}")
+ return 0
+
+
+def cmd_group_rm(mg: MeterGate, a) -> int:
+ mg.storage.delete_group(a.name)
+ return 0
+
+
+def cmd_cap_set(mg: MeterGate, a) -> int:
+ mg.define_capability(a.name, a.resources)
+ return cmd_cap_list(mg, a)
+
+
+def cmd_cap_list(mg: MeterGate, a) -> int:
+ caps = mg.storage.list_capabilities()
+ if getattr(a, "json", False):
+ print(json.dumps(caps))
+ return 0
+ if not caps:
+ print("(no capabilities)")
+ return 0
+ for name, resources in caps.items():
+ print(f"{name:<22} gated by: {', '.join(resources)}")
+ return 0
+
+
+def cmd_user_set(mg: MeterGate, a) -> int:
+ if a.group is not None:
+ mg.set_subject(a.subject, a.group or None)
+ for resource, value in _pairs(a.limits).items():
+ mg.set_override(a.subject, resource, value)
+ return cmd_user_show(mg, a)
+
+
+def cmd_user_show(mg: MeterGate, a) -> int:
+ group = mg.storage.get_subject_group(a.subject)
+ overrides = mg.storage.get_overrides(a.subject)
+ if getattr(a, "json", False):
+ print(json.dumps({"subject": a.subject, "group": group, "overrides": overrides}))
+ return 0
+ print(f"{a.subject}")
+ print(f" group {group or '(none)'}")
+ if overrides:
+ print(" overrides")
+ for resource, value in sorted(overrides.items()):
+ print(f" {resource:<20} {_fmt(resource, value)}")
+ return 0
+
+
+def cmd_use(mg: MeterGate, a) -> int:
+ d = mg.use(a.subject, a.resource, a.amount)
+ return _print_decision(d, a)
+
+
+def cmd_check(mg: MeterGate, a) -> int:
+ if a.cap:
+ allowed = mg.allowed(a.subject, a.cap)
+ if getattr(a, "json", False):
+ print(json.dumps({"subject": a.subject, "capability": a.cap, "allowed": allowed}))
+ else:
+ print(f"{a.subject} {a.cap}: {'ALLOWED' if allowed else 'DENIED'}")
+ return 0 if allowed else 1
+ d = mg.check(a.subject, a.resource)
+ return _print_decision(d, a)
+
+
+def _print_decision(d, a) -> int:
+ if getattr(a, "json", False):
+ print(json.dumps({
+ "subject": d.subject, "resource": d.resource, "used": d.used,
+ "limit": d.limit, "remaining": d.remaining, "allowed": d.allowed,
+ "period": d.period, "reset_at": d.reset_at.isoformat() if d.reset_at else None,
+ }))
+ return 0 if d.allowed else 1
+ state = "OK" if d.allowed else "OVER"
+ used = _fmt(d.resource, d.used)
+ limit = _fmt(d.resource, d.limit)
+ extra = "" if d.limit is None else f" ({_fmt(d.resource, d.remaining)} left)"
+ print(f"[{state}] {d.subject} {d.resource}: {used} / {limit}{extra} resets {d.period}")
+ return 0 if d.allowed else 1
+
+
+def cmd_usage(mg: MeterGate, a) -> int:
+ rows = mg.usage(a.subject)
+ if getattr(a, "json", False):
+ print(json.dumps({r: {"used": d.used, "limit": d.limit, "allowed": d.allowed}
+ for r, d in rows.items()}))
+ return 0
+ if not rows:
+ print(f"{a.subject}: no usage")
+ return 0
+ for resource, d in rows.items():
+ _print_decision(d, a)
+ return 0
+
+
+def cmd_top(mg: MeterGate, a) -> int:
+ period = mg.period.key()
+ rows = mg.storage.top(a.resource, period, a.n)
+ if getattr(a, "json", False):
+ print(json.dumps(rows))
+ return 0
+ for subject, amount in rows:
+ print(f"{_fmt(a.resource, amount):>22} {subject}")
+ return 0
+
+
+def cmd_over(mg: MeterGate, a) -> int:
+ subjects = mg.over_quota(a.resource)
+ if getattr(a, "json", False):
+ print(json.dumps(subjects))
+ return 0
+ for s in subjects:
+ print(s)
+ return 0
+
+
+def cmd_reset(mg: MeterGate, a) -> int:
+ mg.reset(a.subject, all_periods=a.all)
+ return 0
+
+
+def cmd_export(mg: MeterGate, a) -> int:
+ data = {
+ "groups": {g: mg.storage.get_group_limits(g) for g in mg.storage.list_groups()},
+ "capabilities": mg.storage.list_capabilities(),
+ "subjects": {
+ s: {"group": mg.storage.get_subject_group(s), "overrides": mg.storage.get_overrides(s)}
+ for s in mg.storage.list_subjects()
+ },
+ }
+ print(json.dumps(data, indent=2))
+ return 0
+
+
+def cmd_import(mg: MeterGate, a) -> int:
+ raw = sys.stdin.read() if a.file in (None, "-") else open(a.file).read()
+ data = json.loads(raw)
+ for group, limits in data.get("groups", {}).items():
+ for resource, value in limits.items():
+ mg.set_group_limit(group, resource, value)
+ for name, resources in data.get("capabilities", {}).items():
+ mg.define_capability(name, resources)
+ for subject, info in data.get("subjects", {}).items():
+ if info.get("group"):
+ mg.set_subject(subject, info["group"])
+ for resource, value in (info.get("overrides") or {}).items():
+ mg.set_override(subject, resource, value)
+ print("imported", file=sys.stderr)
+ return 0
+
+
+def cmd_serve(mg: MeterGate, a) -> int:
+ from .server import serve
+ token = a.token or os.environ.get("METERGATE_API_TOKEN")
+ note = "" if token else " (no --token: open to anyone who can reach it)"
+ print(f"metergate API on http://{a.host}:{a.port}{note}", file=sys.stderr)
+ serve(mg, host=a.host, port=a.port, token=token)
+ return 0
+
+
+def build_parser() -> argparse.ArgumentParser:
+ p = argparse.ArgumentParser(prog="metergate", description="Tiered usage quotas + capability gating.")
+ p.add_argument("--storage", default=os.environ.get("METERGATE_STORAGE", "sqlite:metergate.db"),
+ help="backend url (default: $METERGATE_STORAGE or sqlite:metergate.db)")
+ p.add_argument("--period", default=os.environ.get("METERGATE_PERIOD", "month"),
+ choices=["month", "week", "day", "never"], help="reset window")
+ p.add_argument("--json", action="store_true", help="machine-readable output")
+ p.add_argument("--version", action="version", version=f"metergate {__version__}")
+ sub = p.add_subparsers(dest="cmd", required=True)
+
+ g = sub.add_parser("group", help="manage groups").add_subparsers(dest="sub", required=True)
+ gs = g.add_parser("set", help="set group limits"); gs.add_argument("name"); gs.add_argument("limits", nargs="*", metavar="resource=amount"); gs.set_defaults(fn=cmd_group_set)
+ g.add_parser("list", help="list groups").set_defaults(fn=cmd_group_list)
+ gr = g.add_parser("rm", help="delete a group"); gr.add_argument("name"); gr.set_defaults(fn=cmd_group_rm)
+
+ c = sub.add_parser("cap", help="manage capabilities").add_subparsers(dest="sub", required=True)
+ cs = c.add_parser("set", help="map a capability to gating resources"); cs.add_argument("name"); cs.add_argument("resources", nargs="+"); cs.set_defaults(fn=cmd_cap_set)
+ c.add_parser("list", help="list capabilities").set_defaults(fn=cmd_cap_list)
+
+ u = sub.add_parser("user", help="manage subjects").add_subparsers(dest="sub", required=True)
+ us = u.add_parser("set", help="set group and/or per-user overrides"); us.add_argument("subject"); us.add_argument("--group", default=None); us.add_argument("limits", nargs="*", metavar="resource=amount"); us.set_defaults(fn=cmd_user_set)
+ ush = u.add_parser("show", help="show a subject"); ush.add_argument("subject"); ush.set_defaults(fn=cmd_user_show)
+
+ pu = sub.add_parser("use", help="record usage"); pu.add_argument("subject"); pu.add_argument("resource"); pu.add_argument("amount"); pu.set_defaults(fn=cmd_use)
+ ck = sub.add_parser("check", help="check a resource or capability"); ck.add_argument("subject"); ck.add_argument("--resource"); ck.add_argument("--cap"); ck.set_defaults(fn=cmd_check)
+ ug = sub.add_parser("usage", help="show all usage for a subject"); ug.add_argument("subject"); ug.set_defaults(fn=cmd_usage)
+ tp = sub.add_parser("top", help="biggest users of a resource this period"); tp.add_argument("resource"); tp.add_argument("-n", type=int, default=20); tp.set_defaults(fn=cmd_top)
+ ov = sub.add_parser("over", help="subjects over their limit for a resource"); ov.add_argument("resource"); ov.set_defaults(fn=cmd_over)
+ rs = sub.add_parser("reset", help="clear usage for a subject"); rs.add_argument("subject"); rs.add_argument("--all", action="store_true", help="all periods, not just current"); rs.set_defaults(fn=cmd_reset)
+ sub.add_parser("export", help="dump config as JSON").set_defaults(fn=cmd_export)
+ im = sub.add_parser("import", help="load config from JSON (stdin or file)"); im.add_argument("file", nargs="?", default="-"); im.set_defaults(fn=cmd_import)
+ sv = sub.add_parser("serve", help="run the HTTP/JSON API"); sv.add_argument("--host", default="127.0.0.1"); sv.add_argument("--port", type=int, default=8400); sv.add_argument("--token", default=None, help="bearer token (or $METERGATE_API_TOKEN)"); sv.set_defaults(fn=cmd_serve)
+ return p
+
+
+def main(argv: list[str] | None = None) -> int:
+ args = build_parser().parse_args(argv)
+ if getattr(args, "cmd", None) == "check" and not args.resource and not args.cap:
+ print("check needs --resource or --cap", file=sys.stderr)
+ return 2
+ mg = _mg(args)
+ try:
+ return args.fn(mg, args)
+ except MeterGateError as e:
+ print(f"error: {e}", file=sys.stderr)
+ return 1
+ finally:
+ mg.close()
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/src/metergate/errors.py b/src/metergate/errors.py
new file mode 100644
index 0000000..3cda9cc
--- /dev/null
+++ b/src/metergate/errors.py
@@ -0,0 +1,14 @@
+class MeterGateError(Exception):
+ """Base class for all metergate errors."""
+
+
+class UnknownGroup(MeterGateError):
+ pass
+
+
+class UnknownCapability(MeterGateError):
+ pass
+
+
+class StorageError(MeterGateError):
+ pass
diff --git a/src/metergate/manager.py b/src/metergate/manager.py
new file mode 100644
index 0000000..e7f4ecf
--- /dev/null
+++ b/src/metergate/manager.py
@@ -0,0 +1,102 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Mapping
+
+from .errors import UnknownCapability
+from .models import Decision
+from .period import Period
+from .sizes import parse_size
+from .storage.base import Storage
+from .storage.memory import MemoryStorage
+
+
+class MeterGate:
+ """Tiered usage quotas with per-user overrides and capability gating.
+
+ Limit resolution per (subject, resource): per-user override → the subject's
+ group limit → unlimited. Usage accrues into the current period bucket, so
+ the allowance resets when the period rolls over.
+ """
+
+ def __init__(
+ self,
+ storage: Storage | None = None,
+ period: Period | str = "month",
+ default_group: str | None = None,
+ ):
+ self.storage = storage if storage is not None else MemoryStorage()
+ self.period = period if isinstance(period, Period) else Period(period)
+ self.default_group = default_group
+
+ # ── configuration ───────────────────────────────────────────────────────
+ def define_group(self, group: str, limits: Mapping[str, int | str | None] | None = None) -> "MeterGate":
+ for resource, limit in (limits or {}).items():
+ self.set_group_limit(group, resource, limit)
+ return self
+
+ def set_group_limit(self, group: str, resource: str, limit: int | str | None) -> None:
+ self.storage.set_group_limit(group, resource, None if limit is None else parse_size(limit))
+
+ def define_capability(self, name: str, resources: list[str]) -> "MeterGate":
+ self.storage.set_capability(name, list(resources))
+ return self
+
+ def set_subject(self, subject: str, group: str | None = None) -> None:
+ self.storage.set_subject_group(subject, group)
+
+ def set_override(self, subject: str, resource: str, limit: int | str | None) -> None:
+ self.storage.set_override(subject, resource, None if limit is None else parse_size(limit))
+
+ # ── limit resolution ────────────────────────────────────────────────────
+ def limit_for(self, subject: str, resource: str) -> int | None:
+ override = self.storage.get_override(subject, resource)
+ if override is not None:
+ return override
+ group = self.storage.get_subject_group(subject) or self.default_group
+ if group:
+ return self.storage.get_group_limits(group).get(resource)
+ return None
+
+ # ── usage + checks ──────────────────────────────────────────────────────
+ def use(self, subject: str, resource: str, amount: int | str, now: datetime | None = None) -> Decision:
+ used = self.storage.add_usage(subject, resource, self.period.key(now), parse_size(amount))
+ return self._decision(subject, resource, used, now)
+
+ def check(self, subject: str, resource: str, now: datetime | None = None) -> Decision:
+ used = self.storage.get_usage(subject, resource, self.period.key(now))
+ return self._decision(subject, resource, used, now)
+
+ def _decision(self, subject: str, resource: str, used: int, now: datetime | None) -> Decision:
+ return Decision(
+ subject=subject,
+ resource=resource,
+ used=used,
+ limit=self.limit_for(subject, resource),
+ period=self.period.key(now),
+ reset_at=self.period.reset_at(now),
+ )
+
+ def allowed(self, subject: str, capability: str, now: datetime | None = None) -> bool:
+ resources = self.storage.get_capability(capability)
+ if resources is None:
+ raise UnknownCapability(capability)
+ return all(self.check(subject, r, now).allowed for r in resources)
+
+ def usage(self, subject: str, now: datetime | None = None) -> dict[str, Decision]:
+ key = self.period.key(now)
+ resources = set(self.storage.get_overrides(subject)) | set(self.storage.get_all_usage(subject, key))
+ group = self.storage.get_subject_group(subject) or self.default_group
+ if group:
+ resources |= set(self.storage.get_group_limits(group))
+ return {r: self.check(subject, r, now) for r in sorted(resources)}
+
+ def reset(self, subject: str, now: datetime | None = None, all_periods: bool = True) -> None:
+ self.storage.reset_usage(subject, None if all_periods else self.period.key(now))
+
+ def over_quota(self, resource: str, now: datetime | None = None) -> list[str]:
+ """Subjects currently at/over their limit for `resource` (for enforcement)."""
+ return [s for s in self.storage.list_subjects() if self.check(s, resource, now).over]
+
+ def close(self) -> None:
+ self.storage.close()
diff --git a/src/metergate/models.py b/src/metergate/models.py
new file mode 100644
index 0000000..af561fa
--- /dev/null
+++ b/src/metergate/models.py
@@ -0,0 +1,34 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime
+
+
+@dataclass(frozen=True)
+class Decision:
+ """The outcome of checking one subject against one resource."""
+
+ subject: str
+ resource: str
+ used: int
+ limit: int | None # None = unlimited
+ period: str
+ reset_at: datetime | None
+
+ @property
+ def allowed(self) -> bool:
+ return self.limit is None or self.used < self.limit
+
+ @property
+ def over(self) -> bool:
+ return not self.allowed
+
+ @property
+ def remaining(self) -> int | None:
+ return None if self.limit is None else max(0, self.limit - self.used)
+
+ @property
+ def ratio(self) -> float | None:
+ if not self.limit:
+ return None
+ return self.used / self.limit
diff --git a/src/metergate/period.py b/src/metergate/period.py
new file mode 100644
index 0000000..09a7a87
--- /dev/null
+++ b/src/metergate/period.py
@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+from datetime import datetime, timedelta, timezone
+
+KINDS = ("month", "week", "day", "never")
+
+
+class Period:
+ """A reset window. Usage is bucketed by the period key; a new bucket means a
+ fresh allowance, so resets are implicit. `month` resets on the 1st (UTC)."""
+
+ __slots__ = ("kind",)
+
+ def __init__(self, kind: str = "month"):
+ kind = kind.lower()
+ if kind not in KINDS:
+ raise ValueError(f"unknown period {kind!r}; choose from {KINDS}")
+ self.kind = kind
+
+ def __repr__(self) -> str:
+ return f"Period({self.kind!r})"
+
+ def __eq__(self, other: object) -> bool:
+ return isinstance(other, Period) and other.kind == self.kind
+
+ @staticmethod
+ def _now(now: datetime | None) -> datetime:
+ return now if now is not None else datetime.now(timezone.utc)
+
+ def key(self, now: datetime | None = None) -> str:
+ n = self._now(now)
+ if self.kind == "never":
+ return "all"
+ if self.kind == "day":
+ return n.strftime("%Y-%m-%d")
+ if self.kind == "week":
+ iso = n.isocalendar()
+ return f"{iso.year:04d}-W{iso.week:02d}"
+ return n.strftime("%Y-%m")
+
+ def reset_at(self, now: datetime | None = None) -> datetime | None:
+ n = self._now(now)
+ midnight = n.replace(hour=0, minute=0, second=0, microsecond=0)
+ if self.kind == "never":
+ return None
+ if self.kind == "day":
+ return midnight + timedelta(days=1)
+ if self.kind == "week":
+ return midnight + timedelta(days=8 - n.isoweekday())
+ if n.month == 12:
+ return midnight.replace(year=n.year + 1, month=1, day=1)
+ return midnight.replace(month=n.month + 1, day=1)
diff --git a/src/metergate/server.py b/src/metergate/server.py
new file mode 100644
index 0000000..542e395
--- /dev/null
+++ b/src/metergate/server.py
@@ -0,0 +1,108 @@
+"""A tiny zero-dependency HTTP/JSON API, so anything that speaks HTTP can use
+metergate — not just Python. Single-threaded on purpose: quota checks are cheap
+and infrequent, and it keeps every storage backend safe without a pool."""
+from __future__ import annotations
+
+import json
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from urllib.parse import parse_qs, urlparse
+
+from .errors import MeterGateError
+from .manager import MeterGate
+
+
+def _decision(d) -> dict:
+ return {
+ "subject": d.subject, "resource": d.resource, "used": d.used,
+ "limit": d.limit, "remaining": d.remaining, "allowed": d.allowed,
+ "period": d.period, "reset_at": d.reset_at.isoformat() if d.reset_at else None,
+ }
+
+
+def make_handler(mg: MeterGate, token: str | None):
+ class Handler(BaseHTTPRequestHandler):
+ protocol_version = "HTTP/1.1"
+
+ def log_message(self, *a): # stay quiet
+ pass
+
+ def _send(self, code: int, obj: dict):
+ body = json.dumps(obj).encode()
+ self.send_response(code)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _authed(self) -> bool:
+ return token is None or self.headers.get("Authorization") == f"Bearer {token}"
+
+ def do_GET(self):
+ u = urlparse(self.path)
+ if u.path == "/healthz":
+ return self._send(200, {"ok": True})
+ if not self._authed():
+ return self._send(401, {"error": "unauthorized"})
+ q = parse_qs(u.query)
+ try:
+ if u.path == "/v1/check":
+ subject = q["subject"][0]
+ if "capability" in q:
+ cap = q["capability"][0]
+ return self._send(200, {"subject": subject, "capability": cap, "allowed": mg.allowed(subject, cap)})
+ return self._send(200, _decision(mg.check(subject, q["resource"][0])))
+ if u.path == "/v1/usage":
+ return self._send(200, {r: _decision(d) for r, d in mg.usage(q["subject"][0]).items()})
+ if u.path == "/v1/over":
+ res = q["resource"][0]
+ return self._send(200, {"resource": res, "subjects": mg.over_quota(res)})
+ if u.path == "/v1/top":
+ res = q["resource"][0]
+ n = int(q.get("n", ["20"])[0])
+ return self._send(200, {"resource": res, "top": mg.storage.top(res, mg.period.key(), n)})
+ if u.path == "/v1/groups":
+ return self._send(200, {g: mg.storage.get_group_limits(g) for g in mg.storage.list_groups()})
+ return self._send(404, {"error": "not found"})
+ except KeyError as e:
+ return self._send(400, {"error": f"missing query param {e}"})
+ except MeterGateError as e:
+ return self._send(400, {"error": str(e)})
+
+ def do_POST(self):
+ u = urlparse(self.path)
+ if not self._authed():
+ return self._send(401, {"error": "unauthorized"})
+ length = int(self.headers.get("Content-Length", 0) or 0)
+ try:
+ data = json.loads(self.rfile.read(length) or b"{}")
+ except (ValueError, TypeError):
+ return self._send(400, {"error": "invalid json body"})
+ try:
+ if u.path == "/v1/use":
+ return self._send(200, _decision(mg.use(data["subject"], data["resource"], data["amount"])))
+ if u.path == "/v1/groups":
+ mg.set_group_limit(data["group"], data["resource"], data["limit"])
+ return self._send(200, {"ok": True})
+ if u.path == "/v1/subjects":
+ if "group" in data:
+ mg.set_subject(data["subject"], data["group"])
+ for resource, limit in (data.get("override") or {}).items():
+ mg.set_override(data["subject"], resource, limit)
+ return self._send(200, {"ok": True})
+ return self._send(404, {"error": "not found"})
+ except KeyError as e:
+ return self._send(400, {"error": f"missing field {e}"})
+ except MeterGateError as e:
+ return self._send(400, {"error": str(e)})
+
+ return Handler
+
+
+def serve(mg: MeterGate, host: str = "127.0.0.1", port: int = 8400, token: str | None = None) -> None:
+ httpd = HTTPServer((host, port), make_handler(mg, token))
+ try:
+ httpd.serve_forever()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ httpd.server_close()
diff --git a/src/metergate/sizes.py b/src/metergate/sizes.py
new file mode 100644
index 0000000..69a401d
--- /dev/null
+++ b/src/metergate/sizes.py
@@ -0,0 +1,43 @@
+"""Human-readable byte sizes. K/M/G/T are binary (1024-based)."""
+from __future__ import annotations
+
+_FACTORS = {
+ "B": 1,
+ "K": 1024, "KB": 1024, "KIB": 1024,
+ "M": 1024**2, "MB": 1024**2, "MIB": 1024**2,
+ "G": 1024**3, "GB": 1024**3, "GIB": 1024**3,
+ "T": 1024**4, "TB": 1024**4, "TIB": 1024**4,
+}
+_ORDER = [("TiB", 1024**4), ("GiB", 1024**3), ("MiB", 1024**2), ("KiB", 1024), ("B", 1)]
+
+
+def parse_size(value: str | int) -> int:
+ """Parse an int or a string like '500MB' / '50 GiB' / '1024' into bytes."""
+ if isinstance(value, int):
+ return value
+ s = str(value).strip().replace(" ", "")
+ if not s:
+ raise ValueError("empty size")
+ i = len(s)
+ while i > 0 and (s[i - 1].isalpha()):
+ i -= 1
+ number, unit = s[:i], s[i:].upper()
+ try:
+ n = float(number)
+ except ValueError as e:
+ raise ValueError(f"invalid size: {value!r}") from e
+ if not unit:
+ return int(n)
+ if unit not in _FACTORS:
+ raise ValueError(f"unknown size unit: {unit!r}")
+ return int(n * _FACTORS[unit])
+
+
+def format_size(n: int) -> str:
+ """Render bytes as a compact binary size, e.g. 524288000 -> '500.0 MiB'."""
+ if n < 1024:
+ return f"{n} B"
+ for unit, factor in _ORDER:
+ if n >= factor:
+ return f"{n / factor:.1f} {unit}"
+ return f"{n} B"
diff --git a/src/metergate/storage/__init__.py b/src/metergate/storage/__init__.py
new file mode 100644
index 0000000..5d528e2
--- /dev/null
+++ b/src/metergate/storage/__init__.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+from .base import Storage
+from .memory import MemoryStorage
+from .sqlite import SQLiteStorage
+
+__all__ = ["Storage", "MemoryStorage", "SQLiteStorage", "storage_from_url"]
+
+
+def storage_from_url(url: str) -> Storage:
+ """Build a backend from a URL:
+
+ memory in-process (default)
+ sqlite:/abs/path.db SQLite file (sqlite::memory: for in-memory)
+ postgresql://user@host/db Postgres (extra: metergate[postgres])
+ redis://host:6379/0 Redis (extra: metergate[redis])
+ """
+ if url in ("memory", "memory://"):
+ return MemoryStorage()
+ if url.startswith("sqlite:"):
+ return SQLiteStorage(url[len("sqlite:"):] or ":memory:")
+ if url.startswith(("postgres://", "postgresql://")):
+ from .postgres import PostgresStorage
+ return PostgresStorage(url)
+ if url.startswith("redis://"):
+ from .redis import RedisStorage
+ return RedisStorage(url)
+ raise ValueError(f"unrecognised storage url: {url!r}")
diff --git a/src/metergate/storage/base.py b/src/metergate/storage/base.py
new file mode 100644
index 0000000..80919d7
--- /dev/null
+++ b/src/metergate/storage/base.py
@@ -0,0 +1,54 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+
+class Storage(ABC):
+ """Backend interface. A limit of None means 'unlimited' / 'unset'."""
+
+ # ── groups (named tiers, each with per-resource limits) ─────────────────
+ @abstractmethod
+ def set_group_limit(self, group: str, resource: str, limit: int | None) -> None: ...
+ @abstractmethod
+ def get_group_limits(self, group: str) -> dict[str, int]: ...
+ @abstractmethod
+ def list_groups(self) -> list[str]: ...
+ @abstractmethod
+ def delete_group(self, group: str) -> None: ...
+
+ # ── capabilities (an action gated by one or more resources) ─────────────
+ @abstractmethod
+ def set_capability(self, name: str, resources: list[str]) -> None: ...
+ @abstractmethod
+ def get_capability(self, name: str) -> list[str] | None: ...
+ @abstractmethod
+ def list_capabilities(self) -> dict[str, list[str]]: ...
+
+ # ── subjects (users): group membership + per-resource overrides ─────────
+ @abstractmethod
+ def set_subject_group(self, subject: str, group: str | None) -> None: ...
+ @abstractmethod
+ def get_subject_group(self, subject: str) -> str | None: ...
+ @abstractmethod
+ def set_override(self, subject: str, resource: str, limit: int | None) -> None: ...
+ @abstractmethod
+ def get_override(self, subject: str, resource: str) -> int | None: ...
+ @abstractmethod
+ def get_overrides(self, subject: str) -> dict[str, int]: ...
+ @abstractmethod
+ def list_subjects(self) -> list[str]: ...
+
+ # ── usage (per subject, resource, period bucket) ────────────────────────
+ @abstractmethod
+ def add_usage(self, subject: str, resource: str, period: str, amount: int) -> int: ...
+ @abstractmethod
+ def get_usage(self, subject: str, resource: str, period: str) -> int: ...
+ @abstractmethod
+ def get_all_usage(self, subject: str, period: str) -> dict[str, int]: ...
+ @abstractmethod
+ def reset_usage(self, subject: str, period: str | None = None) -> None: ...
+ @abstractmethod
+ def top(self, resource: str, period: str, n: int) -> list[tuple[str, int]]: ...
+
+ def close(self) -> None: # optional
+ pass
diff --git a/src/metergate/storage/memory.py b/src/metergate/storage/memory.py
new file mode 100644
index 0000000..b6c6391
--- /dev/null
+++ b/src/metergate/storage/memory.py
@@ -0,0 +1,104 @@
+from __future__ import annotations
+
+import threading
+
+from .base import Storage
+
+
+class MemoryStorage(Storage):
+ """Process-local backend. Useful for tests, single-process apps, and as a
+ cache layer. Thread-safe; not shared across processes."""
+
+ def __init__(self) -> None:
+ self._groups: dict[str, dict[str, int]] = {}
+ self._caps: dict[str, list[str]] = {}
+ self._subject_group: dict[str, str] = {}
+ self._overrides: dict[str, dict[str, int]] = {}
+ self._usage: dict[tuple[str, str, str], int] = {}
+ self._lock = threading.Lock()
+
+ def set_group_limit(self, group, resource, limit):
+ with self._lock:
+ limits = self._groups.setdefault(group, {})
+ if limit is None:
+ limits.pop(resource, None)
+ else:
+ limits[resource] = int(limit)
+
+ def get_group_limits(self, group):
+ return dict(self._groups.get(group, {}))
+
+ def list_groups(self):
+ return sorted(self._groups)
+
+ def delete_group(self, group):
+ with self._lock:
+ self._groups.pop(group, None)
+
+ def set_capability(self, name, resources):
+ with self._lock:
+ self._caps[name] = list(resources)
+
+ def get_capability(self, name):
+ c = self._caps.get(name)
+ return list(c) if c is not None else None
+
+ def list_capabilities(self):
+ return {k: list(v) for k, v in self._caps.items()}
+
+ def set_subject_group(self, subject, group):
+ with self._lock:
+ if group is None:
+ self._subject_group.pop(subject, None)
+ else:
+ self._subject_group[subject] = group
+
+ def get_subject_group(self, subject):
+ return self._subject_group.get(subject)
+
+ def set_override(self, subject, resource, limit):
+ with self._lock:
+ ov = self._overrides.setdefault(subject, {})
+ if limit is None:
+ ov.pop(resource, None)
+ else:
+ ov[resource] = int(limit)
+
+ def get_override(self, subject, resource):
+ return self._overrides.get(subject, {}).get(resource)
+
+ def get_overrides(self, subject):
+ return dict(self._overrides.get(subject, {}))
+
+ def list_subjects(self):
+ seen = set(self._subject_group) | set(self._overrides)
+ seen |= {s for (s, _, _) in self._usage}
+ return sorted(seen)
+
+ def add_usage(self, subject, resource, period, amount):
+ key = (subject, resource, period)
+ with self._lock:
+ self._usage[key] = self._usage.get(key, 0) + int(amount)
+ return self._usage[key]
+
+ def get_usage(self, subject, resource, period):
+ return self._usage.get((subject, resource, period), 0)
+
+ def get_all_usage(self, subject, period):
+ return {
+ r: v for (s, r, p), v in self._usage.items()
+ if s == subject and p == period
+ }
+
+ def reset_usage(self, subject, period=None):
+ with self._lock:
+ for key in [k for k in self._usage if k[0] == subject and (period is None or k[2] == period)]:
+ del self._usage[key]
+
+ def top(self, resource, period, n):
+ rows = [
+ (s, v) for (s, r, p), v in self._usage.items()
+ if r == resource and p == period
+ ]
+ rows.sort(key=lambda kv: kv[1], reverse=True)
+ return rows[:n]
diff --git a/src/metergate/storage/postgres.py b/src/metergate/storage/postgres.py
new file mode 100644
index 0000000..acd7a52
--- /dev/null
+++ b/src/metergate/storage/postgres.py
@@ -0,0 +1,145 @@
+from __future__ import annotations
+
+from .base import Storage
+
+_SCHEMA = (
+ "CREATE TABLE IF NOT EXISTS mg_group_limits (grp text, resource text, lim bigint NOT NULL, PRIMARY KEY (grp, resource))",
+ "CREATE TABLE IF NOT EXISTS mg_caps (name text PRIMARY KEY, resources jsonb NOT NULL)",
+ "CREATE TABLE IF NOT EXISTS mg_subjects (subject text PRIMARY KEY, grp text)",
+ "CREATE TABLE IF NOT EXISTS mg_overrides (subject text, resource text, lim bigint NOT NULL, PRIMARY KEY (subject, resource))",
+ "CREATE TABLE IF NOT EXISTS mg_usage (subject text, resource text, period text, amount bigint NOT NULL DEFAULT 0, PRIMARY KEY (subject, resource, period))",
+ "CREATE INDEX IF NOT EXISTS mg_usage_rp ON mg_usage (resource, period)",
+)
+
+
+class PostgresStorage(Storage):
+ """Postgres backend (extra: metergate[postgres]). Safe for concurrent
+ processes — increments are atomic via INSERT ... ON CONFLICT."""
+
+ def __init__(self, dsn: str, init_schema: bool = True):
+ import psycopg
+ from psycopg.types.json import Json
+
+ self._json = Json
+ self._conn = psycopg.connect(dsn, autocommit=True)
+ if init_schema:
+ with self._conn.cursor() as cur:
+ for stmt in _SCHEMA:
+ cur.execute(stmt)
+
+ def _all(self, sql, params=()):
+ with self._conn.cursor() as cur:
+ cur.execute(sql, params)
+ return cur.fetchall()
+
+ def _one(self, sql, params=()):
+ with self._conn.cursor() as cur:
+ cur.execute(sql, params)
+ return cur.fetchone()
+
+ def _exec(self, sql, params=()):
+ with self._conn.cursor() as cur:
+ cur.execute(sql, params)
+
+ def set_group_limit(self, group, resource, limit):
+ if limit is None:
+ self._exec("DELETE FROM mg_group_limits WHERE grp=%s AND resource=%s", (group, resource))
+ else:
+ self._exec(
+ "INSERT INTO mg_group_limits(grp,resource,lim) VALUES(%s,%s,%s) "
+ "ON CONFLICT (grp,resource) DO UPDATE SET lim=EXCLUDED.lim",
+ (group, resource, int(limit)),
+ )
+
+ def get_group_limits(self, group):
+ return {r: l for r, l in self._all("SELECT resource,lim FROM mg_group_limits WHERE grp=%s", (group,))}
+
+ def list_groups(self):
+ return [r[0] for r in self._all("SELECT DISTINCT grp FROM mg_group_limits ORDER BY grp")]
+
+ def delete_group(self, group):
+ self._exec("DELETE FROM mg_group_limits WHERE grp=%s", (group,))
+
+ def set_capability(self, name, resources):
+ self._exec(
+ "INSERT INTO mg_caps(name,resources) VALUES(%s,%s) "
+ "ON CONFLICT (name) DO UPDATE SET resources=EXCLUDED.resources",
+ (name, self._json(list(resources))),
+ )
+
+ def get_capability(self, name):
+ row = self._one("SELECT resources FROM mg_caps WHERE name=%s", (name,))
+ return list(row[0]) if row else None
+
+ def list_capabilities(self):
+ return {n: list(r) for n, r in self._all("SELECT name,resources FROM mg_caps")}
+
+ def set_subject_group(self, subject, group):
+ if group is None:
+ self._exec("DELETE FROM mg_subjects WHERE subject=%s", (subject,))
+ else:
+ self._exec(
+ "INSERT INTO mg_subjects(subject,grp) VALUES(%s,%s) "
+ "ON CONFLICT (subject) DO UPDATE SET grp=EXCLUDED.grp",
+ (subject, group),
+ )
+
+ def get_subject_group(self, subject):
+ row = self._one("SELECT grp FROM mg_subjects WHERE subject=%s", (subject,))
+ return row[0] if row else None
+
+ def set_override(self, subject, resource, limit):
+ if limit is None:
+ self._exec("DELETE FROM mg_overrides WHERE subject=%s AND resource=%s", (subject, resource))
+ else:
+ self._exec(
+ "INSERT INTO mg_overrides(subject,resource,lim) VALUES(%s,%s,%s) "
+ "ON CONFLICT (subject,resource) DO UPDATE SET lim=EXCLUDED.lim",
+ (subject, resource, int(limit)),
+ )
+
+ def get_override(self, subject, resource):
+ row = self._one("SELECT lim FROM mg_overrides WHERE subject=%s AND resource=%s", (subject, resource))
+ return row[0] if row else None
+
+ def get_overrides(self, subject):
+ return {r: l for r, l in self._all("SELECT resource,lim FROM mg_overrides WHERE subject=%s", (subject,))}
+
+ def list_subjects(self):
+ rows = self._all(
+ "SELECT subject FROM mg_subjects UNION SELECT subject FROM mg_overrides "
+ "UNION SELECT subject FROM mg_usage")
+ return sorted(r[0] for r in rows)
+
+ def add_usage(self, subject, resource, period, amount):
+ row = self._one(
+ "INSERT INTO mg_usage(subject,resource,period,amount) VALUES(%s,%s,%s,%s) "
+ "ON CONFLICT (subject,resource,period) DO UPDATE SET amount=mg_usage.amount+EXCLUDED.amount "
+ "RETURNING amount",
+ (subject, resource, period, int(amount)),
+ )
+ return row[0]
+
+ def get_usage(self, subject, resource, period):
+ row = self._one(
+ "SELECT amount FROM mg_usage WHERE subject=%s AND resource=%s AND period=%s",
+ (subject, resource, period))
+ return row[0] if row else 0
+
+ def get_all_usage(self, subject, period):
+ return {r: a for r, a in self._all(
+ "SELECT resource,amount FROM mg_usage WHERE subject=%s AND period=%s", (subject, period))}
+
+ def reset_usage(self, subject, period=None):
+ if period is None:
+ self._exec("DELETE FROM mg_usage WHERE subject=%s", (subject,))
+ else:
+ self._exec("DELETE FROM mg_usage WHERE subject=%s AND period=%s", (subject, period))
+
+ def top(self, resource, period, n):
+ return [(s, a) for s, a in self._all(
+ "SELECT subject,amount FROM mg_usage WHERE resource=%s AND period=%s ORDER BY amount DESC LIMIT %s",
+ (resource, period, n))]
+
+ def close(self):
+ self._conn.close()
diff --git a/src/metergate/storage/redis.py b/src/metergate/storage/redis.py
new file mode 100644
index 0000000..a56bdbf
--- /dev/null
+++ b/src/metergate/storage/redis.py
@@ -0,0 +1,113 @@
+from __future__ import annotations
+
+import json
+
+from .base import Storage
+
+
+class RedisStorage(Storage):
+ """Redis backend (extra: metergate[redis]). Increments are atomic (INCRBY);
+ auxiliary sets/sorted-sets back `top`, `over`, and listing."""
+
+ def __init__(self, url: str, prefix: str = "mg:"):
+ import redis
+
+ self._r = redis.Redis.from_url(url, decode_responses=True)
+ self._p = prefix
+
+ def _g(self, group): return f"{self._p}g:{group}"
+ def _ov(self, subject): return f"{self._p}ov:{subject}"
+ def _u(self, subject, resource, period): return f"{self._p}u:{subject}:{resource}:{period}"
+ def _z(self, resource, period): return f"{self._p}z:{resource}:{period}"
+ def _rs(self, subject, period): return f"{self._p}rs:{subject}:{period}"
+ def _sp(self, subject): return f"{self._p}sp:{subject}"
+
+ def set_group_limit(self, group, resource, limit):
+ if limit is None:
+ self._r.hdel(self._g(group), resource)
+ if not self._r.exists(self._g(group)):
+ self._r.srem(f"{self._p}groups", group)
+ else:
+ self._r.hset(self._g(group), resource, int(limit))
+ self._r.sadd(f"{self._p}groups", group)
+
+ def get_group_limits(self, group):
+ return {k: int(v) for k, v in self._r.hgetall(self._g(group)).items()}
+
+ def list_groups(self):
+ return sorted(self._r.smembers(f"{self._p}groups"))
+
+ def delete_group(self, group):
+ self._r.delete(self._g(group))
+ self._r.srem(f"{self._p}groups", group)
+
+ def set_capability(self, name, resources):
+ self._r.hset(f"{self._p}caps", name, json.dumps(list(resources)))
+
+ def get_capability(self, name):
+ v = self._r.hget(f"{self._p}caps", name)
+ return json.loads(v) if v is not None else None
+
+ def list_capabilities(self):
+ return {k: json.loads(v) for k, v in self._r.hgetall(f"{self._p}caps").items()}
+
+ def set_subject_group(self, subject, group):
+ if group is None:
+ self._r.hdel(f"{self._p}sg", subject)
+ else:
+ self._r.hset(f"{self._p}sg", subject, group)
+ self._r.sadd(f"{self._p}subjects", subject)
+
+ def get_subject_group(self, subject):
+ return self._r.hget(f"{self._p}sg", subject)
+
+ def set_override(self, subject, resource, limit):
+ if limit is None:
+ self._r.hdel(self._ov(subject), resource)
+ else:
+ self._r.hset(self._ov(subject), resource, int(limit))
+ self._r.sadd(f"{self._p}subjects", subject)
+
+ def get_override(self, subject, resource):
+ v = self._r.hget(self._ov(subject), resource)
+ return int(v) if v is not None else None
+
+ def get_overrides(self, subject):
+ return {k: int(v) for k, v in self._r.hgetall(self._ov(subject)).items()}
+
+ def list_subjects(self):
+ return sorted(self._r.smembers(f"{self._p}subjects"))
+
+ def add_usage(self, subject, resource, period, amount):
+ new = self._r.incrby(self._u(subject, resource, period), int(amount))
+ self._r.zadd(self._z(resource, period), {subject: new})
+ self._r.sadd(self._rs(subject, period), resource)
+ self._r.sadd(self._sp(subject), period)
+ self._r.sadd(f"{self._p}subjects", subject)
+ return new
+
+ def get_usage(self, subject, resource, period):
+ v = self._r.get(self._u(subject, resource, period))
+ return int(v) if v is not None else 0
+
+ def get_all_usage(self, subject, period):
+ return {r: self.get_usage(subject, r, period) for r in self._r.smembers(self._rs(subject, period))}
+
+ def reset_usage(self, subject, period=None):
+ periods = [period] if period else list(self._r.smembers(self._sp(subject)))
+ for p in periods:
+ for r in self._r.smembers(self._rs(subject, p)):
+ self._r.delete(self._u(subject, r, p))
+ self._r.zrem(self._z(r, p), subject)
+ self._r.delete(self._rs(subject, p))
+ self._r.srem(self._sp(subject), p)
+
+ def top(self, resource, period, n):
+ rows = self._r.zrevrange(self._z(resource, period), 0, max(0, n - 1), withscores=True)
+ return [(s, int(a)) for s, a in rows]
+
+ def close(self):
+ try:
+ self._r.close()
+ except Exception:
+ pass
diff --git a/src/metergate/storage/sqlite.py b/src/metergate/storage/sqlite.py
new file mode 100644
index 0000000..d60c70a
--- /dev/null
+++ b/src/metergate/storage/sqlite.py
@@ -0,0 +1,139 @@
+from __future__ import annotations
+
+import json
+import sqlite3
+import threading
+
+from .base import Storage
+
+_SCHEMA = """
+CREATE TABLE IF NOT EXISTS mg_group_limits (grp TEXT, resource TEXT, lim INTEGER NOT NULL, PRIMARY KEY (grp, resource));
+CREATE TABLE IF NOT EXISTS mg_caps (name TEXT PRIMARY KEY, resources TEXT NOT NULL);
+CREATE TABLE IF NOT EXISTS mg_subjects (subject TEXT PRIMARY KEY, grp TEXT);
+CREATE TABLE IF NOT EXISTS mg_overrides (subject TEXT, resource TEXT, lim INTEGER NOT NULL, PRIMARY KEY (subject, resource));
+CREATE TABLE IF NOT EXISTS mg_usage (subject TEXT, resource TEXT, period TEXT, amount INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (subject, resource, period));
+CREATE INDEX IF NOT EXISTS mg_usage_rp ON mg_usage (resource, period);
+"""
+
+
+class SQLiteStorage(Storage):
+ def __init__(self, path: str = ":memory:"):
+ self._db = sqlite3.connect(path, check_same_thread=False)
+ self._db.execute("PRAGMA journal_mode=WAL")
+ self._db.executescript(_SCHEMA)
+ self._db.commit()
+ self._lock = threading.Lock()
+
+ def _write(self, sql: str, params=()):
+ with self._lock:
+ self._db.execute(sql, params)
+ self._db.commit()
+
+ def set_group_limit(self, group, resource, limit):
+ if limit is None:
+ self._write("DELETE FROM mg_group_limits WHERE grp=? AND resource=?", (group, resource))
+ else:
+ self._write(
+ "INSERT INTO mg_group_limits(grp,resource,lim) VALUES(?,?,?) "
+ "ON CONFLICT(grp,resource) DO UPDATE SET lim=excluded.lim",
+ (group, resource, int(limit)),
+ )
+
+ def get_group_limits(self, group):
+ return {r: l for r, l in self._db.execute(
+ "SELECT resource,lim FROM mg_group_limits WHERE grp=?", (group,))}
+
+ def list_groups(self):
+ return [r[0] for r in self._db.execute("SELECT DISTINCT grp FROM mg_group_limits ORDER BY grp")]
+
+ def delete_group(self, group):
+ self._write("DELETE FROM mg_group_limits WHERE grp=?", (group,))
+
+ def set_capability(self, name, resources):
+ self._write(
+ "INSERT INTO mg_caps(name,resources) VALUES(?,?) "
+ "ON CONFLICT(name) DO UPDATE SET resources=excluded.resources",
+ (name, json.dumps(list(resources))),
+ )
+
+ def get_capability(self, name):
+ row = self._db.execute("SELECT resources FROM mg_caps WHERE name=?", (name,)).fetchone()
+ return json.loads(row[0]) if row else None
+
+ def list_capabilities(self):
+ return {n: json.loads(r) for n, r in self._db.execute("SELECT name,resources FROM mg_caps")}
+
+ def set_subject_group(self, subject, group):
+ if group is None:
+ self._write("DELETE FROM mg_subjects WHERE subject=?", (subject,))
+ else:
+ self._write(
+ "INSERT INTO mg_subjects(subject,grp) VALUES(?,?) "
+ "ON CONFLICT(subject) DO UPDATE SET grp=excluded.grp",
+ (subject, group),
+ )
+
+ def get_subject_group(self, subject):
+ row = self._db.execute("SELECT grp FROM mg_subjects WHERE subject=?", (subject,)).fetchone()
+ return row[0] if row else None
+
+ def set_override(self, subject, resource, limit):
+ if limit is None:
+ self._write("DELETE FROM mg_overrides WHERE subject=? AND resource=?", (subject, resource))
+ else:
+ self._write(
+ "INSERT INTO mg_overrides(subject,resource,lim) VALUES(?,?,?) "
+ "ON CONFLICT(subject,resource) DO UPDATE SET lim=excluded.lim",
+ (subject, resource, int(limit)),
+ )
+
+ def get_override(self, subject, resource):
+ row = self._db.execute(
+ "SELECT lim FROM mg_overrides WHERE subject=? AND resource=?", (subject, resource)).fetchone()
+ return row[0] if row else None
+
+ def get_overrides(self, subject):
+ return {r: l for r, l in self._db.execute(
+ "SELECT resource,lim FROM mg_overrides WHERE subject=?", (subject,))}
+
+ def list_subjects(self):
+ q = ("SELECT subject FROM mg_subjects UNION SELECT subject FROM mg_overrides "
+ "UNION SELECT subject FROM mg_usage")
+ return sorted(r[0] for r in self._db.execute(q))
+
+ def add_usage(self, subject, resource, period, amount):
+ with self._lock:
+ self._db.execute(
+ "INSERT INTO mg_usage(subject,resource,period,amount) VALUES(?,?,?,?) "
+ "ON CONFLICT(subject,resource,period) DO UPDATE SET amount=amount+excluded.amount",
+ (subject, resource, period, int(amount)),
+ )
+ row = self._db.execute(
+ "SELECT amount FROM mg_usage WHERE subject=? AND resource=? AND period=?",
+ (subject, resource, period)).fetchone()
+ self._db.commit()
+ return row[0]
+
+ def get_usage(self, subject, resource, period):
+ row = self._db.execute(
+ "SELECT amount FROM mg_usage WHERE subject=? AND resource=? AND period=?",
+ (subject, resource, period)).fetchone()
+ return row[0] if row else 0
+
+ def get_all_usage(self, subject, period):
+ return {r: a for r, a in self._db.execute(
+ "SELECT resource,amount FROM mg_usage WHERE subject=? AND period=?", (subject, period))}
+
+ def reset_usage(self, subject, period=None):
+ if period is None:
+ self._write("DELETE FROM mg_usage WHERE subject=?", (subject,))
+ else:
+ self._write("DELETE FROM mg_usage WHERE subject=? AND period=?", (subject, period))
+
+ def top(self, resource, period, n):
+ return [(s, a) for s, a in self._db.execute(
+ "SELECT subject,amount FROM mg_usage WHERE resource=? AND period=? ORDER BY amount DESC LIMIT ?",
+ (resource, period, n))]
+
+ def close(self):
+ self._db.close()
diff --git a/tests/test_metergate.py b/tests/test_metergate.py
new file mode 100644
index 0000000..4794420
--- /dev/null
+++ b/tests/test_metergate.py
@@ -0,0 +1,128 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from metergate import MemoryStorage, MeterGate, Period, SQLiteStorage, format_size, parse_size
+
+
+def stores():
+ return [MemoryStorage(), SQLiteStorage(":memory:")]
+
+
+def _seed(store):
+ mg = MeterGate(store, period="month")
+ mg.define_group("free", {"media": 100}).define_group("pro", {"media": 1000})
+ mg.define_capability("upload", ["media"])
+ return mg
+
+
+@pytest.mark.parametrize("store", stores())
+def test_limit_resolution(store):
+ mg = _seed(store)
+ mg.set_subject("a", "free")
+ assert mg.limit_for("a", "media") == 100
+ mg.set_override("a", "media", 250)
+ assert mg.limit_for("a", "media") == 250 # override wins
+ mg.set_override("a", "media", None)
+ assert mg.limit_for("a", "media") == 100 # falls back to group
+ assert mg.limit_for("unknown", "media") is None # no group = unlimited
+
+
+@pytest.mark.parametrize("store", stores())
+def test_use_check_and_gate(store):
+ mg = _seed(store)
+ mg.set_subject("a", "free")
+ mg.use("a", "media", 90)
+ assert mg.check("a", "media").allowed
+ assert mg.allowed("a", "upload")
+ d = mg.use("a", "media", 20) # 110 > 100
+ assert d.over and d.used == 110
+ assert not mg.allowed("a", "upload") # capability now denied
+ assert "a" in mg.over_quota("media")
+ assert mg.check("a", "media").remaining == 0
+
+
+@pytest.mark.parametrize("store", stores())
+def test_top_and_reset(store):
+ mg = _seed(store)
+ mg.use("a", "media", 50)
+ mg.use("b", "media", 80)
+ assert mg.storage.top("media", mg.period.key(), 5)[0][0] == "b"
+ mg.reset("b")
+ assert mg.check("b", "media").used == 0
+
+
+def test_period_reset_across_month():
+ mg = MeterGate(MemoryStorage(), period="month")
+ mg.define_group("g", {"r": 10})
+ mg.set_subject("a", "g")
+ jan = datetime(2026, 1, 20, tzinfo=timezone.utc)
+ feb = datetime(2026, 2, 1, tzinfo=timezone.utc)
+ mg.use("a", "r", 10, now=jan)
+ assert mg.check("a", "r", now=jan).over
+ assert mg.check("a", "r", now=feb).used == 0 # fresh allowance next month
+
+
+def test_period_keys():
+ n = datetime(2026, 6, 14, 12, 0, tzinfo=timezone.utc)
+ assert Period("month").key(n) == "2026-06"
+ assert Period("day").key(n) == "2026-06-14"
+ assert Period("never").key(n) == "all"
+ assert Period("month").reset_at(n) == datetime(2026, 7, 1, tzinfo=timezone.utc)
+
+
+def test_sizes():
+ assert parse_size("500MB") == 500 * 1024 ** 2
+ assert parse_size("1GiB") == 1024 ** 3
+ assert parse_size(2048) == 2048
+ assert "MiB" in format_size(parse_size("5MB"))
+ with pytest.raises(ValueError):
+ parse_size("12 furlongs")
+
+
+def test_http_api():
+ import json as _json
+ import threading
+ import urllib.request
+ from http.server import HTTPServer
+
+ from metergate.server import make_handler
+
+ mg = MeterGate(MemoryStorage(), period="month")
+ mg.define_group("free", {"media": 100})
+ mg.define_capability("up", ["media"])
+ mg.set_subject("a", "free")
+
+ httpd = HTTPServer(("127.0.0.1", 0), make_handler(mg, token=None))
+ port = httpd.server_address[1]
+ threading.Thread(target=httpd.serve_forever, daemon=True).start()
+
+ def post(path, payload):
+ req = urllib.request.Request(
+ f"http://127.0.0.1:{port}{path}", data=_json.dumps(payload).encode(),
+ headers={"Content-Type": "application/json"}, method="POST")
+ return _json.load(urllib.request.urlopen(req))
+
+ def get(path):
+ return _json.load(urllib.request.urlopen(f"http://127.0.0.1:{port}{path}"))
+
+ try:
+ assert post("/v1/use", {"subject": "a", "resource": "media", "amount": 90})["used"] == 90
+ assert get("/v1/check?subject=a&capability=up")["allowed"] is True
+ post("/v1/use", {"subject": "a", "resource": "media", "amount": 20}) # 110 > 100
+ assert get("/v1/check?subject=a&capability=up")["allowed"] is False
+ assert "a" in get("/v1/over?resource=media")["subjects"]
+ finally:
+ httpd.shutdown()
+
+
+def test_cli(tmp_path):
+ from metergate.cli import main
+ db = f"sqlite:{tmp_path}/q.db"
+ assert main(["--storage", db, "group", "set", "free", "media=100"]) == 0
+ assert main(["--storage", db, "cap", "set", "upload", "media"]) == 0
+ assert main(["--storage", db, "user", "set", "a", "--group", "free"]) == 0
+ main(["--storage", db, "use", "a", "media", "90"])
+ assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 0 # allowed
+ main(["--storage", db, "use", "a", "media", "20"])
+ assert main(["--storage", db, "check", "a", "--cap", "upload"]) == 1 # denied