1115 lines
33 KiB
Markdown
1115 lines
33 KiB
Markdown
Below is a drop-in, on-prem friendly telemetry + pricing brain for CHORUS. It’s opinionated, PostgreSQL-first, offline-capable, and designed to drive upsell _without_ metered compute.
|
||
|
||
# KACHING: design spec (schema + event flow)
|
||
|
||
## What it does
|
||
|
||
- Collects per-job agent metrics (CPU/GPU secs, RAM peak, I/O, context ops).
|
||
|
||
- Rolls up to hourly/daily org-level usage.
|
||
|
||
- Compares against **license/tier limits** and **budgets**.
|
||
|
||
- Emits upgrade suggestions + alerts.
|
||
|
||
- (Optional) Pushes redacted rollups to HQ for global analytics.
|
||
|
||
|
||
---
|
||
|
||
## 1) Data model (PostgreSQL)
|
||
|
||
**Core entities**
|
||
|
||
- `org` ← a customer (even on single-tenant installs keep this; it future-proofs).
|
||
|
||
- `deployment` ← an installation of CHORUS under an org.
|
||
|
||
- `node` ← physical/virtual machine running agents.
|
||
|
||
- `agent` ← a logical worker (e.g., `SLURP_ingest`, `UCXL_resolver`).
|
||
|
||
- `job` ← unit of work (has UCXL address).
|
||
|
||
- `usage_sample` ← raw metrics per job.
|
||
|
||
- `usage_rollup_*` ← materialized summaries.
|
||
|
||
- `license`, `pricing_plan`, `quota`, `budget` ← monetisation controls.
|
||
|
||
- `feature_flag` ← enables premium capabilities.
|
||
|
||
- `alert`, `suggestion` ← user-facing nudges/notifications.
|
||
|
||
- `api_key`, `ingest_token` ← auth for push events.
|
||
|
||
|
||
> Partition time-series tables by day; use `pg_partman` or native declarative partitioning.
|
||
|
||
**DDL (core)**
|
||
|
||
```sql
|
||
-- orgs & deployments
|
||
create table org (
|
||
org_id uuid primary key,
|
||
name text not null,
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
create table deployment (
|
||
deployment_id uuid primary key,
|
||
org_id uuid not null references org(org_id),
|
||
name text not null,
|
||
timezone text not null default 'UTC',
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
-- nodes & agents
|
||
create table node (
|
||
node_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
hostname text not null,
|
||
hw_class text, -- e.g., "i5-12400 + RTX3060"
|
||
labels jsonb not null default '{}',
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
create table agent (
|
||
agent_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
node_id uuid references node(node_id),
|
||
agent_type text not null, -- e.g., "SLURP_ingest"
|
||
version text,
|
||
labels jsonb not null default '{}',
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
-- jobs
|
||
create table job (
|
||
job_id uuid primary key,
|
||
agent_id uuid not null references agent(agent_id),
|
||
ucxl_addr text not null,
|
||
started_at timestamptz not null,
|
||
ended_at timestamptz,
|
||
status text not null check (status in ('running','succeeded','failed','canceled')),
|
||
correlation_id text, -- idempotency key from caller
|
||
meta jsonb not null default '{}'
|
||
);
|
||
|
||
-- raw usage (partition by day on observed_at)
|
||
create table usage_sample (
|
||
sample_id uuid primary key,
|
||
job_id uuid not null references job(job_id),
|
||
observed_at timestamptz not null,
|
||
cpu_seconds numeric(18,6) not null default 0,
|
||
gpu_seconds numeric(18,6) not null default 0,
|
||
ram_mb_peak numeric(18,3) not null default 0,
|
||
disk_io_mb numeric(18,3) not null default 0,
|
||
net_in_mb numeric(18,3) not null default 0,
|
||
net_out_mb numeric(18,3) not null default 0,
|
||
context_reads integer not null default 0,
|
||
context_writes integer not null default 0,
|
||
context_bytes_read bigint not null default 0,
|
||
context_bytes_written bigint not null default 0,
|
||
model_name text, -- if any LLM was used (local/cloud)
|
||
model_tokens_in bigint default 0,
|
||
model_tokens_out bigint default 0,
|
||
flags jsonb not null default '{}'
|
||
);
|
||
|
||
-- rollups (hourly & daily)
|
||
create table usage_rollup_hourly (
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
bucket_start timestamptz not null, -- aligned to hour
|
||
agent_type text not null,
|
||
cpu_seconds numeric(18,6) not null,
|
||
gpu_seconds numeric(18,6) not null,
|
||
ram_mb_peak numeric(18,3) not null,
|
||
net_in_mb numeric(18,3) not null,
|
||
net_out_mb numeric(18,3) not null,
|
||
context_reads bigint not null,
|
||
context_writes bigint not null,
|
||
context_bytes_read bigint not null,
|
||
context_bytes_written bigint not null,
|
||
model_tokens_in bigint not null,
|
||
model_tokens_out bigint not null,
|
||
jobs_succeeded bigint not null,
|
||
jobs_failed bigint not null,
|
||
primary key (deployment_id, bucket_start, agent_type)
|
||
);
|
||
|
||
create table usage_rollup_daily (
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
day date not null,
|
||
cpu_seconds numeric(18,6) not null,
|
||
gpu_seconds numeric(18,6) not null,
|
||
context_bytes_written bigint not null,
|
||
seats_active integer not null default 0,
|
||
nodes_active integer not null default 0,
|
||
orchestration_peak_concurrency integer not null default 0,
|
||
model_tokens_in bigint not null,
|
||
model_tokens_out bigint not null,
|
||
primary key (deployment_id, day)
|
||
);
|
||
|
||
-- licensing / pricing
|
||
create table pricing_plan (
|
||
plan_id text primary key, -- e.g., 'SMB_Pro', 'Mid_Business'
|
||
meta jsonb not null -- published plan limits/features
|
||
);
|
||
|
||
create table license (
|
||
license_id uuid primary key,
|
||
org_id uuid not null references org(org_id),
|
||
plan_id text not null references pricing_plan(plan_id),
|
||
seats_limit integer,
|
||
nodes_limit integer,
|
||
features jsonb not null, -- e.g., {"temporal_nav": true, "federation": false}
|
||
valid_from date not null,
|
||
valid_to date not null,
|
||
signed_blob bytea not null -- vendor-signed license
|
||
);
|
||
|
||
create table quota (
|
||
quota_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
name text not null, -- e.g., 'context_bytes', 'temporal_queries'
|
||
period text not null, -- 'daily','monthly','rolling_30d'
|
||
hard_limit bigint, -- null => unlimited in plan
|
||
soft_threshold bigint, -- trigger suggestion at e.g. 80%
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
create table budget (
|
||
budget_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
scope text not null, -- 'ingest','reason','orchestration'
|
||
period text not null, -- 'daily','weekly','monthly'
|
||
limit_units numeric(18,6) not null, -- arbitrary unit (e.g., cpu_seconds)
|
||
action text not null, -- 'warn','throttle','block','fallback_model'
|
||
created_at timestamptz not null default now()
|
||
);
|
||
|
||
-- alerts & suggestions
|
||
create table alert (
|
||
alert_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
created_at timestamptz not null default now(),
|
||
severity text not null check (severity in ('info','warn','error')),
|
||
code text not null, -- e.g., 'QUOTA_NEARING'
|
||
message text not null,
|
||
context jsonb not null default '{}',
|
||
acknowledged boolean not null default false
|
||
);
|
||
|
||
create table suggestion (
|
||
suggestion_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
created_at timestamptz not null default now(),
|
||
kind text not null, -- 'upgrade_tier','enable_feature','tune_pipeline'
|
||
rationale text not null,
|
||
target_plan text, -- suggested plan id
|
||
diffs jsonb not null default '{}', -- what they gain quantitatively
|
||
shown_to_user boolean not null default false,
|
||
accepted boolean
|
||
);
|
||
|
||
-- auth for ingestion
|
||
create table ingest_token (
|
||
token_id uuid primary key,
|
||
deployment_id uuid not null references deployment(deployment_id),
|
||
token_hash bytea not null, -- store only hash
|
||
scopes text[] not null, -- ['ingest:usage','ingest:jobs']
|
||
created_at timestamptz not null default now(),
|
||
expires_at timestamptz
|
||
);
|
||
|
||
-- convenience: daily seat/node activity
|
||
create materialized view mv_daily_activity as
|
||
select
|
||
d.deployment_id,
|
||
date_trunc('day', j.started_at) as day,
|
||
count(distinct a.agent_id) filter (where j.status='succeeded') as agents_active,
|
||
count(distinct a.node_id) as nodes_active
|
||
from job j
|
||
join agent a on a.agent_id = j.agent_id
|
||
join deployment d on d.deployment_id = a.deployment_id
|
||
group by 1,2;
|
||
```
|
||
|
||
Indexes you’ll want: `(job.agent_id)`, `(usage_sample.job_id)`, `(usage_sample.observed_at)`, `(usage_rollup_hourly.bucket_start)`, `(alert.deployment_id, created_at desc)`, `(quota.deployment_id, name)`.
|
||
|
||
---
|
||
|
||
## 2) Telemetry event flow
|
||
|
||
**On each agent run (end-of-job or periodic heartbeat):**
|
||
|
||
1. Agent emits a **signed** JSON payload to the local KACHING Ingest API.
|
||
|
||
2. KACHING validates token + signature, dedups via `correlation_id`.
|
||
|
||
3. Persist raw `job` + `usage_sample`.
|
||
|
||
4. Stream to **Aggregator** worker (local queue: Redis Streams/NATS JetStream).
|
||
|
||
5. Aggregator updates hourly/daily rollups and checks **quota/budget**.
|
||
|
||
6. If thresholds breached → create `alert` + `suggestion`.
|
||
|
||
7. (Optional) Post redacted rollups to HQ (batch, e.g., every 6h) over mTLS.
|
||
|
||
|
||
**Event schema (agent → ingest)**
|
||
|
||
```json
|
||
{
|
||
"schema": "kaching.v1.usage",
|
||
"deployment_id": "6d0b1bcb-...-9c9d",
|
||
"agent": {
|
||
"agent_id": "0f32...ab",
|
||
"agent_type": "SLURP_ingest",
|
||
"version": "1.7.3"
|
||
},
|
||
"node": {
|
||
"node_id": "4aa0...de",
|
||
"hostname": "raven01",
|
||
"hw_class": "Ryzen 5600 + RTX 3060 12G"
|
||
},
|
||
"job": {
|
||
"job_id": "a3e8...90",
|
||
"correlation_id": "ucxl:alpha:2025-08-14T08:03:05Z:42",
|
||
"ucxl_addr": "ucxl://any:finance@project:alpha/*/report.md",
|
||
"started_at": "2025-08-14T08:03:00Z",
|
||
"ended_at": "2025-08-14T08:03:05Z",
|
||
"status": "succeeded",
|
||
"meta": { "workflow": "SLURP", "source": "Confluence" }
|
||
},
|
||
"metrics": {
|
||
"cpu_seconds": 48.5,
|
||
"gpu_seconds": 3.1,
|
||
"ram_mb_peak": 520,
|
||
"disk_io_mb": 18.2,
|
||
"net_in_mb": 0.5,
|
||
"net_out_mb": 1.2,
|
||
"context_reads": 114,
|
||
"context_writes": 7,
|
||
"context_bytes_read": 812345,
|
||
"context_bytes_written": 1280033
|
||
},
|
||
"llm": {
|
||
"model_name": "llama3.1:8b-q4",
|
||
"tokens_in": 1820,
|
||
"tokens_out": 740,
|
||
"provider": "local"
|
||
},
|
||
"sig": {
|
||
"algo": "ed25519",
|
||
"key_id": "agentkey-02",
|
||
"signature": "<base64>"
|
||
}
|
||
}
|
||
```
|
||
|
||
**Idempotency rules**
|
||
|
||
- `job.correlation_id` required; duplicates are `UPSERT`ed (last one wins if timestamps advance).
|
||
|
||
- Clock skew tolerated: if `ended_at < started_at` by small delta, accept and flag.
|
||
|
||
|
||
**Security**
|
||
|
||
- mTLS between agents and KACHING.
|
||
|
||
- Per-deployment `ingest_token` with narrow scopes.
|
||
|
||
- Payload signature checked against registered `key_id` (rotate quarterly).
|
||
|
||
- PII-free by default; redact `ucxl_addr` segments via local policy if needed.
|
||
|
||
|
||
---
|
||
|
||
## 3) Aggregation & thresholds
|
||
|
||
**Rollup jobs**
|
||
|
||
- Hourly: group by `deployment_id, agent_type, bucket_start`.
|
||
|
||
- Daily: group by `deployment_id, day`.
|
||
|
||
- Maintain **peak orchestration concurrency** using job overlap counts (interval tree or `btree_gist` on `(started_at, ended_at)`).
|
||
|
||
|
||
**Threshold engine**
|
||
|
||
- Load effective limits from `license.features` + `quota`.
|
||
|
||
- Example checks:
|
||
|
||
- Soft: `context_bytes_written (30d)` ≥ 80% of limit → `QUOTA_NEARING`.
|
||
|
||
- Hard: `nodes_active` > `license.nodes_limit` → `HARD_BLOCK` (if policy says).
|
||
|
||
- Budget: `gpu_seconds (daily)` > budget → emit `BUDGET_EXCEEDED` with policy action (`fallback_model` etc).
|
||
|
||
|
||
**Suggestion generator**
|
||
|
||
- Map overages to plan ladder deltas (from your published `pricing_plan.meta`):
|
||
|
||
- “Upgrade Pro → Business: +3× context, +multi-site federation”
|
||
|
||
- Compute **quantitative deltas** (e.g., “your 30-day context is 1.8× current cap; Business raises cap to 3.0×”).
|
||
|
||
|
||
---
|
||
|
||
## 4) Example queries that drive the UI/upsell
|
||
|
||
**A) Soft quota nearing (30-day rolling)**
|
||
|
||
```sql
|
||
with windowed as (
|
||
select deployment_id,
|
||
sum(context_bytes_written) over (
|
||
partition by deployment_id
|
||
order by day
|
||
rows between 29 preceding and current row
|
||
) as bytes_30d,
|
||
day
|
||
from usage_rollup_daily
|
||
)
|
||
select w.deployment_id, w.day, w.bytes_30d, q.soft_threshold, q.hard_limit
|
||
from windowed w
|
||
join quota q on q.deployment_id = w.deployment_id and q.name='context_bytes' and q.period='rolling_30d'
|
||
where w.bytes_30d >= q.soft_threshold;
|
||
```
|
||
|
||
**B) Peak concurrency**
|
||
|
||
```sql
|
||
-- approximate: count overlapping running jobs at minute resolution
|
||
select deployment_id, bucket_minute, max(concurrency) as peak
|
||
from (
|
||
select a.deployment_id,
|
||
date_trunc('minute', g.bucket) as bucket_minute,
|
||
count(*) as concurrency
|
||
from job j
|
||
join agent a on a.agent_id = j.agent_id
|
||
join generate_series(j.started_at, j.ended_at, interval '1 minute') as g(bucket) on true
|
||
where j.status in ('running','succeeded')
|
||
group by 1,2
|
||
) t
|
||
group by 1,2
|
||
order by peak desc
|
||
limit 1;
|
||
```
|
||
|
||
**C) “Who to upsell this week?”**
|
||
|
||
```sql
|
||
select d.deployment_id,
|
||
sum(case when name='context_bytes' then 1 else 0 end) as context_near,
|
||
sum(case when name='temporal_queries' then 1 else 0 end) as temporal_near
|
||
from alert a
|
||
join deployment d on d.deployment_id=a.deployment_id
|
||
where a.code='QUOTA_NEARING' and a.created_at > now() - interval '7 days'
|
||
group by 1
|
||
order by (context_near + temporal_near) desc
|
||
limit 25;
|
||
```
|
||
|
||
---
|
||
|
||
## 5) API surfaces (local only by default)
|
||
|
||
**Ingest**
|
||
|
||
- `POST /v1/ingest/usage` : accepts `kaching.v1.usage` (above)
|
||
|
||
- `POST /v1/ingest/job-status` : minimal heartbeat/status updates
|
||
|
||
|
||
**Dashboards**
|
||
|
||
- `GET /v1/usage/daily?deployment_id=...&from=...&to=...`
|
||
|
||
- `GET /v1/limits/effective?deployment_id=...` (license + quotas merged)
|
||
|
||
- `GET /v1/alerts?deployment_id=...`
|
||
|
||
- `GET /v1/suggestions?deployment_id=...`
|
||
|
||
|
||
**Admin**
|
||
|
||
- `POST /v1/quota` (create/update)
|
||
|
||
- `POST /v1/budget`
|
||
|
||
- `POST /v1/license/activate` (uploads vendor-signed blob)
|
||
|
||
- `POST /v1/tokens` (issue/rotate ingest tokens)
|
||
|
||
|
||
Auth: local mTLS + `Authorization: Bearer <token>`; all responses cacheable 60s.
|
||
|
||
---
|
||
|
||
## 6) Deployment architecture (on-prem first)
|
||
|
||
- **KACHING Core** (Go/Rust service)
|
||
|
||
- HTTP ingest + API
|
||
|
||
- Aggregator worker
|
||
|
||
- Scheduler (rollups, alerts)
|
||
|
||
- Optional “HQ Sync” module (disabled by default)
|
||
|
||
- **State**
|
||
|
||
- PostgreSQL 15+ (enable `pg_partman` / native range partitioning)
|
||
|
||
- Redis/NATS for event buffer (optional; fall back to `COPY`-on-commit if absent)
|
||
|
||
- **Packaging**
|
||
|
||
- Systemd unit or Docker Compose
|
||
|
||
- Helm chart for k8s clusters (nodeSelector for DB locality)
|
||
|
||
- **Resilience**
|
||
|
||
- Backpressure: agents buffer to disk (bounded queue) if ingest is unavailable
|
||
|
||
- Idempotent writes via `correlation_id`
|
||
|
||
- Daily VACUUM/ANALYZE; weekly `REFRESH MATERIALIZED VIEW mv_daily_activity`
|
||
|
||
|
||
---
|
||
|
||
## 7) Retention & performance
|
||
|
||
- **Raw `usage_sample`**: 14–30 days hot; downsample into rollups; archive to parquet on local object storage (MinIO) monthly.
|
||
|
||
- **Rollups**: keep 24 months.
|
||
|
||
- Partition keys: `usage_sample.observed_at` (daily), `usage_rollup_hourly.bucket_start` (daily).
|
||
|
||
- Use `BRIN` indexes for time partitions; `btree` for FK lookups.
|
||
|
||
|
||
---
|
||
|
||
## 8) License enforcement (light-touch)
|
||
|
||
- **Signed license** (`license.signed_blob`) includes: plan id, seat/node caps, expiry, feature bitset, signature.
|
||
|
||
- Enforce **soft** by default (warn + suggest), **hard** only when explicitly configured (enterprise asks).
|
||
|
||
- Local-only check; never phones home unless HQ Sync is enabled.
|
||
|
||
|
||
**Prove governance, not just spend.** KACHING rolls up per-job context ops, model tokens, and concurrency into org-level signals, compares against **quotas/budgets**, and emits policy actions (warn/throttle/block/fallback). This’s how we enforce _runtime_ guardrails in a way boards and auditors can verify.
|
||
|
||
|
||
---
|
||
|
||
## 9) Budget actions (runtime knobs CHORUS can use)
|
||
|
||
When KACHING emits `BUDGET_EXCEEDED` with `action`:
|
||
|
||
- `warn`: post in CHORUS banner.
|
||
|
||
- `throttle`: cap orchestration concurrency N.
|
||
|
||
- `block`: reject new jobs in that scope.
|
||
|
||
- `fallback_model`: instruct agent to switch to cheaper local model; pass a `policy_decision` payload back via CHORUS control channel.
|
||
|
||
|
||
---
|
||
|
||
## 10) UX hooks (what users see)
|
||
|
||
- **Tier Utilization card**: seats, nodes, temporal queries, context volume; sparkline + % of cap.
|
||
|
||
- **Bottleneck callouts**: “Temporal queries blocked 3× this week.”
|
||
|
||
- **Clear upgrade CTA**: shows _concrete deltas_ (“+3× context window, +multi-site federation”).
|
||
|
||
|
||
---
|
||
|
||
## 11) Minimal agent SDK shim
|
||
|
||
**Config (YAML)**
|
||
|
||
```yaml
|
||
kaching:
|
||
ingest_url: https://kaching.local/v1/ingest/usage
|
||
token: ${KACHING_TOKEN}
|
||
key_id: agentkey-02
|
||
key_path: /etc/chorus/agent_ed25519
|
||
flush_interval_ms: 2000
|
||
max_batch: 200
|
||
dedupe_key: correlation_id
|
||
redact:
|
||
ucxl_addr_segments: ["credentials", "secrets"]
|
||
```
|
||
|
||
**Agent integration (pseudo-code)**
|
||
|
||
```python
|
||
with Job("ucxl://.../report.md") as job:
|
||
# ... do work ...
|
||
kaching.emit_usage(job_id=job.id,
|
||
cpu_seconds=cpu,
|
||
gpu_seconds=gpu,
|
||
context_reads=reads,
|
||
context_writes=writes,
|
||
model_name=model,
|
||
tokens_in=tin, tokens_out=tout)
|
||
```
|
||
|
||
---
|
||
|
||
## 12) What this unlocks for pricing
|
||
|
||
- Flat annual licenses remain clean **because** you can prove usage & growth.
|
||
|
||
- Automated, evidence-based upsells (“you’re consistently at 88–95% of context cap; Business tier gives +3× headroom and federation you already need”).
|
||
|
||
- If you _ever_ add paid add-ons (e.g., optional hosted LLM fallback), the hooks (model tokens, provider) are already in the schema.
|
||
|
||
|
||
---
|
||
|
||
## 13) Implementation order (2-week sprint)
|
||
|
||
1. DDL + partitioning + ingest auth (ed25519, token hash).
|
||
|
||
2. Ingest API + idempotent writes + agent shim.
|
||
|
||
3. Hourly/daily rollups + three alerts (`QUOTA_NEARING`, `BUDGET_EXCEEDED`, `NODES_LIMIT_EXCEEDED`).
|
||
|
||
4. Suggestion generator v1 (map to your three ladders).
|
||
|
||
5. Minimal web UI cards (tier utilization, alerts, CTA).
|
||
|
||
6. Optional: HQ Sync (batched, redacted).
|
||
|
||
---
|
||
|
||
**KACHING on Google Cloud** a tight, secure, hybrid telemetry platform that your **on-prem CHORUS** installs can push into. Below is a practical blueprint you can build from today: services, IAM, network, data path, OpenAPI, and a Go reference skeleton.
|
||
|
||
# [[KACHING]] on GCP: production architecture
|
||
|
||
## Core services (minimal, proven stack)
|
||
|
||
- **Cloud Run (fully managed)**
|
||
|
||
- `kaching-ingest` (public endpoint, locked by mutual auth + signed payloads)
|
||
|
||
- `kaching-api` (dashboards/admin; private behind IAP)
|
||
|
||
- `kaching-aggregator` (background rollups; also run as **Cloud Run jobs**)
|
||
|
||
- **Pub/Sub**
|
||
|
||
- `usage-events` (raw events)
|
||
|
||
- `dead-letter-usage` (DLQ with retry policy)
|
||
|
||
- **Cloud SQL for PostgreSQL 15+** (primary persistence; point-in-time recovery enabled)
|
||
|
||
- **Cloud Storage**
|
||
|
||
- `kaching-archive` (monthly Parquet archives; optional customer exports)
|
||
|
||
- **BigQuery**
|
||
|
||
- `kaching_analytics` dataset (hourly/daily rollups mirrored for analytics & Looker)
|
||
|
||
- **Secret Manager** (ingest tokens, per-org agent public keys, DB creds)
|
||
|
||
- **Cloud KMS** (CMEK for Cloud SQL, GCS, Pub/Sub; license signing keys)
|
||
|
||
- **Cloud Scheduler + Cloud Run jobs** (hourly/daily rollup + archive tasks)
|
||
|
||
- **Cloud Logging + Cloud Monitoring** (SLIs/SLOs; alerting policies)
|
||
|
||
- **Cloud Armor** (WAF in front of HTTPS LB to Cloud Run)
|
||
|
||
- **Identity-Aware Proxy (IAP)** (protect `kaching-api` admin UI)
|
||
|
||
- **VPC + Serverless VPC Access** (Cloud Run ↔ Cloud SQL private IP)
|
||
|
||
|
||
> Optional (later): **Apigee** if you need enterprise API governance; **Dataflow** if rollups outgrow SQL.
|
||
|
||
---
|
||
|
||
## Data flow (event path)
|
||
|
||
Telemetry is **signed + idempotent** (ed25519, correlation IDs) and multi-tenant isolated (RLS/KMS/CMEK); redacted rollups can sync to HQ for cross-deployment analytics.
|
||
|
||
1. **On-prem agent** completes a job → emits **signed** `kaching.v1.usage` JSON over HTTPS to `kaching-ingest`.
|
||
|
||
- Auth: `Authorization: Bearer <ingest_token>` (hash validated)
|
||
|
||
- Integrity: detached **Ed25519** signature over canonical JSON (agent key registered per org)
|
||
|
||
- Idempotency: `job.correlation_id`
|
||
|
||
2. `kaching-ingest` performs **fast stateless checks** → pushes message to **Pub/Sub `usage-events`** (attributes include `org_id`, `deployment_id`, `schema_version`, `event_ts`).
|
||
|
||
3. `kaching-aggregator` (Pub/Sub push/pull)
|
||
|
||
- Writes **raw** to `usage_sample`/`job` in **Cloud SQL** (in txn)
|
||
|
||
- Maintains **hourly/daily** rollups (SQL upserts)
|
||
|
||
- Evaluates **quota/budget** → inserts `alert`/`suggestion`
|
||
|
||
- Mirrors fresh rollups to **BigQuery** (streaming inserts or 5-min batch)
|
||
|
||
4. **Cloud Run job** (hourly, daily)
|
||
|
||
- VACUUM/ANALYZE hot partitions
|
||
|
||
- Export prior month raw to **GCS Parquet**
|
||
|
||
- Advance materialized views
|
||
|
||
5. `kaching-api` serves dashboards (IAP-protected) + admin endpoints.
|
||
|
||
|
||
---
|
||
|
||
## Multi-tenancy & data isolation
|
||
|
||
- All rows keyed by `org_id` + `deployment_id`; enforce **RLS (Row Level Security)** in PostgreSQL for any shared read paths.
|
||
|
||
- Each org has a **KMS-wrapped secret** namespace (ingest tokens, agent public keys).
|
||
|
||
- CMEK on Cloud SQL & GCS; per-org key rings if you need tenant keying.
|
||
|
||
|
||
---
|
||
|
||
## Security model (pragmatic & strong)
|
||
|
||
- **Ingress**: HTTPS LB → Cloud Armor → Cloud Run (`kaching-ingest`).
|
||
|
||
- **mTLS (optional)**: If you control agent certs, terminate mTLS at the LB; otherwise rely on:
|
||
|
||
- Bearer **ingest token** (DB-stored bcrypt/argon2 hash)
|
||
|
||
- **Ed25519** signature (payload integrity, replay window ≤ 5 min)
|
||
|
||
- **Idempotency** via `(org_id, correlation_id)` unique index
|
||
|
||
- **Secrets**: only via Secret Manager; short-lived Cloud SQL IAM tokens (no static passwords in code).
|
||
|
||
- **Least-privilege IAM** service accounts (see below).
|
||
|
||
|
||
---
|
||
|
||
## IAM layout (service accounts & roles)
|
||
|
||
- `sa-kaching-ingest`
|
||
|
||
- `roles/pubsub.publisher` (to `usage-events`)
|
||
|
||
- `roles/secretmanager.secretAccessor` (read token pepper + org pubkeys)
|
||
|
||
- `sa-kaching-aggregator`
|
||
|
||
- `roles/cloudsql.client`, `roles/secretmanager.secretAccessor`
|
||
|
||
- `roles/pubsub.subscriber` (from `usage-events`)
|
||
|
||
- `roles/storage.objectAdmin` (to write Parquet archives)
|
||
|
||
- `roles/bigquery.dataEditor` (analytics dataset)
|
||
|
||
- `sa-kaching-api`
|
||
|
||
- `roles/cloudsql.client`, `roles/secretmanager.secretAccessor`
|
||
|
||
- Protected by **IAP**; org admins authenticated via Google Identity
|
||
|
||
- `sa-kaching-scheduler`
|
||
|
||
- Invokes Cloud Run jobs; minimal runner roles
|
||
|
||
- KMS: grant each SA `cryptoKeyEncrypterDecrypter` on CMEK keys.
|
||
|
||
|
||
---
|
||
|
||
## Database schema
|
||
|
||
Use the schema we defined earlier (orgs, deployments, nodes, agents, jobs, usage_sample, rollups, license, quota, budget, alert, suggestion, tokens, etc.) **unchanged**—it fits Cloud SQL.
|
||
Add:
|
||
|
||
```sql
|
||
-- RLS example (read-only API role sees only its org)
|
||
alter table usage_rollup_daily enable row level security;
|
||
create policy org_isolation on usage_rollup_daily
|
||
using (org_id = current_setting('app.current_org')::uuid);
|
||
```
|
||
|
||
Your API sets `set app.current_org = '<org-uuid>';` after auth.
|
||
|
||
---
|
||
|
||
## OpenAPI (ingest + suggestions)
|
||
|
||
```yaml
|
||
openapi: 3.0.3
|
||
info:
|
||
title: KACHING API
|
||
version: 1.0.0
|
||
servers:
|
||
- url: https://ingest.kaching.yourdomain.com
|
||
paths:
|
||
/v1/ingest/usage:
|
||
post:
|
||
security: [{ IngestToken: [] }]
|
||
requestBody:
|
||
required: true
|
||
content:
|
||
application/json:
|
||
schema:
|
||
$ref: '#/components/schemas/KachingUsageV1'
|
||
responses:
|
||
'202': { description: accepted }
|
||
'400': { description: bad request }
|
||
'401': { description: unauthorized }
|
||
'409': { description: duplicate (idempotent) }
|
||
/v1/suggestions:
|
||
get:
|
||
security: [{ IAPAuth: [] }]
|
||
parameters:
|
||
- in: query; name: deployment_id; required: true; schema: { type: string, format: uuid }
|
||
responses:
|
||
'200':
|
||
description: list
|
||
content:
|
||
application/json:
|
||
schema:
|
||
type: array
|
||
items: { $ref: '#/components/schemas/Suggestion' }
|
||
components:
|
||
securitySchemes:
|
||
IngestToken:
|
||
type: http
|
||
scheme: bearer
|
||
IAPAuth:
|
||
type: http
|
||
scheme: bearer
|
||
schemas:
|
||
KachingUsageV1:
|
||
type: object
|
||
required: [schema, deployment_id, agent, node, job, metrics, sig]
|
||
properties:
|
||
schema: { type: string, enum: ["kaching.v1.usage"] }
|
||
deployment_id: { type: string, format: uuid }
|
||
agent: { type: object, required: [agent_id, agent_type] }
|
||
node: { type: object, required: [node_id, hostname] }
|
||
job: { type: object, required: [job_id, ucxl_addr, started_at, status] }
|
||
metrics: { type: object }
|
||
llm: { type: object }
|
||
sig:
|
||
type: object
|
||
required: [algo, key_id, signature]
|
||
properties:
|
||
algo: { type: string, enum: ["ed25519"] }
|
||
key_id: { type: string }
|
||
signature: { type: string } # base64
|
||
Suggestion:
|
||
type: object
|
||
properties:
|
||
suggestion_id: { type: string, format: uuid }
|
||
kind: { type: string }
|
||
rationale: { type: string }
|
||
target_plan: { type: string }
|
||
diffs: { type: object }
|
||
```
|
||
|
||
---
|
||
|
||
## Go reference: Cloud Run **ingest** handler (concise skeleton)
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"context"
|
||
"crypto/ed25519"
|
||
"crypto/subtle"
|
||
"encoding/base64"
|
||
"encoding/json"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"time"
|
||
|
||
"cloud.google.com/go/pubsub"
|
||
)
|
||
|
||
type Usage struct {
|
||
Schema string `json:"schema"`
|
||
DeploymentID string `json:"deployment_id"`
|
||
Agent struct {
|
||
AgentID string `json:"agent_id"`
|
||
AgentType string `json:"agent_type"`
|
||
Version string `json:"version"`
|
||
} `json:"agent"`
|
||
Node struct {
|
||
NodeID string `json:"node_id"`
|
||
Hostname string `json:"hostname"`
|
||
HwClass string `json:"hw_class"`
|
||
} `json:"node"`
|
||
Job struct {
|
||
JobID string `json:"job_id"`
|
||
CorrelationID string `json:"correlation_id"`
|
||
UCXLAddr string `json:"ucxl_addr"`
|
||
StartedAt time.Time `json:"started_at"`
|
||
EndedAt *time.Time `json:"ended_at"`
|
||
Status string `json:"status"`
|
||
Meta any `json:"meta"`
|
||
} `json:"job"`
|
||
Metrics any `json:"metrics"`
|
||
LLM any `json:"llm"`
|
||
Sig struct {
|
||
Algo string `json:"algo"`
|
||
KeyID string `json:"key_id"`
|
||
Signature string `json:"signature"`
|
||
} `json:"sig"`
|
||
}
|
||
|
||
var (
|
||
topic *pubsub.Topic
|
||
)
|
||
|
||
func main() {
|
||
ctx := context.Background()
|
||
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
|
||
topicName := os.Getenv("PUBSUB_TOPIC") // usage-events
|
||
client, err := pubsub.NewClient(ctx, projectID)
|
||
if err != nil { log.Fatal(err) }
|
||
topic = client.Topic(topicName)
|
||
http.HandleFunc("/v1/ingest/usage", handleIngest)
|
||
log.Fatal(http.ListenAndServe(":8080", nil))
|
||
}
|
||
|
||
func handleIngest(w http.ResponseWriter, r *http.Request) {
|
||
// 1) Auth: bearer token
|
||
tok := r.Header.Get("Authorization")
|
||
if !validateIngestToken(tok) {
|
||
http.Error(w, "unauthorized", http.StatusUnauthorized); return
|
||
}
|
||
// 2) Parse
|
||
var u Usage
|
||
dec := json.NewDecoder(r.Body)
|
||
dec.DisallowUnknownFields()
|
||
if err := dec.Decode(&u); err != nil {
|
||
http.Error(w, "bad json", http.StatusBadRequest); return
|
||
}
|
||
if u.Schema != "kaching.v1.usage" {
|
||
http.Error(w, "bad schema", http.StatusBadRequest); return
|
||
}
|
||
// 3) Verify signature over canonical JSON (client must sign without "sig")
|
||
// For brevity assume client also sends "X-Payload-Hash" header we check here.
|
||
if !verifySignature(r.Context(), u) {
|
||
http.Error(w, "bad signature", http.StatusUnauthorized); return
|
||
}
|
||
// 4) Publish to Pub/Sub with ordering key = deployment_id
|
||
b, _ := json.Marshal(u)
|
||
res := topic.Publish(r.Context(), &pubsub.Message{
|
||
Data: b,
|
||
OrderingKey: u.DeploymentID,
|
||
Attributes: map[string]string{
|
||
"deployment_id": u.DeploymentID,
|
||
"agent_type": u.Agent.AgentType,
|
||
"schema": u.Schema,
|
||
},
|
||
})
|
||
if _, err := res.Get(r.Context()); err != nil {
|
||
http.Error(w, "queue error", http.StatusServiceUnavailable); return
|
||
}
|
||
w.WriteHeader(http.StatusAccepted)
|
||
}
|
||
|
||
func validateIngestToken(hdr string) bool {
|
||
// Expect "Bearer abc"
|
||
// Lookup hashed value in Secret Manager or Redis; constant-time compare
|
||
want := os.Getenv("INGEST_TOKEN_HASH") // e.g., sha256 hex
|
||
got := extractBearer(hdr)
|
||
sum := sha256Hex(got)
|
||
return subtle.ConstantTimeCompare([]byte(sum), []byte(want)) == 1
|
||
}
|
||
|
||
func verifySignature(ctx context.Context, u Usage) bool {
|
||
// Lookup org/deployment → key_id → ed25519 public key (Secret Manager)
|
||
pub := fetchEd25519Pub(ctx, u.Sig.KeyID)
|
||
sigBytes, _ := base64.StdEncoding.DecodeString(u.Sig.Signature)
|
||
// canonicalize payload without Sig; omitted here for brevity
|
||
payload := canonicalJSON(u, /*excludeSig=*/true)
|
||
return ed25519.Verify(pub, payload, sigBytes)
|
||
}
|
||
```
|
||
|
||
> Notes
|
||
> • Enable **ordered delivery** on Pub/Sub if you need strict per-deployment ordering.
|
||
> • For signature canonicalization, fix field order and whitespace (e.g., RFC 8785 JSON Canonicalization Scheme).
|
||
|
||
---
|
||
|
||
## Rollups & thresholds (Cloud Run job)
|
||
|
||
- **Hourly job**:
|
||
|
||
- UPSERT into `usage_rollup_hourly` grouped by `(deployment_id, bucket_start, agent_type)`
|
||
|
||
- Evaluate **soft/hard quotas**; write `alert`/`suggestion`
|
||
|
||
- Stream hourly snapshot into **BigQuery** (for Looker dashboards)
|
||
|
||
- **Daily job**:
|
||
|
||
- UPSERT `usage_rollup_daily`
|
||
|
||
- Export aging `usage_sample` partitions → **GCS Parquet**
|
||
|
||
- `VACUUM/ANALYZE` current partitions
|
||
|
||
- Refresh materialized views
|
||
|
||
|
||
Run both with **Cloud Scheduler** → **Cloud Run jobs** (retries on failure).
|
||
|
||
---
|
||
|
||
## Networking & regions
|
||
|
||
- Put everything in one **primary region** close to your customers (e.g., `australia-southeast1` for you).
|
||
|
||
- Turn on **PITR** for Cloud SQL; add read replica in nearby region if needed.
|
||
|
||
- If you require **egress locality** for data sovereignty, pin **GCS** and **BigQuery** datasets to the same region and enforce org-scoped KMS keys.
|
||
|
||
|
||
---
|
||
|
||
## Observability & SLOs
|
||
|
||
- **SLI ideas**:
|
||
|
||
- Ingest success rate ≥ 99.9% (5-min windows)
|
||
|
||
- Pub/Sub → DB end-to-end latency p95 < 2 min
|
||
|
||
- Aggregator error rate < 0.1%
|
||
|
||
- **Dashboards** (Cloud Monitoring):
|
||
|
||
- Requests by status, Pub/Sub undelivered messages, Cloud Run CPU/mem, Cloud SQL CPU/IO wait
|
||
|
||
- **Alerts**:
|
||
|
||
- Pub/Sub backlog > threshold 10 min
|
||
|
||
- Cloud Run 5xx > 1% for 5 min
|
||
|
||
- SQL connection errors burst
|
||
|
||
|
||
---
|
||
|
||
## Terraform pointers (resource sketch)
|
||
|
||
```hcl
|
||
resource "google_pubsub_topic" "usage" {
|
||
name = "usage-events"
|
||
message_retention_duration = "604800s"
|
||
kms_key_name = google_kms_crypto_key.pubsub.id
|
||
}
|
||
|
||
resource "google_sql_database_instance" "pg" {
|
||
name = "kaching-pg"
|
||
database_version = "POSTGRES_15"
|
||
region = "australia-southeast1"
|
||
settings {
|
||
tier = "db-custom-2-7680" # pick to taste
|
||
backup_configuration { point_in_time_recovery_enabled = true }
|
||
ip_configuration { private_network = google_compute_network.vpc.id }
|
||
encryption_key_name = google_kms_crypto_key.sql.id
|
||
}
|
||
}
|
||
|
||
resource "google_cloud_run_service" "ingest" {
|
||
name = "kaching-ingest"
|
||
location = "australia-southeast1"
|
||
template {
|
||
spec {
|
||
containers {
|
||
image = "gcr.io/PROJECT/kaching-ingest:latest"
|
||
env { name = "PUBSUB_TOPIC" value = google_pubsub_topic.usage.name }
|
||
}
|
||
service_account_name = google_service_account.ingest.email
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## License issuance on GCP
|
||
|
||
- Keep a **vendor signing key** in **Cloud KMS**; sign a compact license JWT or binary blob containing: `org_id`, `plan_id`, `valid_from/to`, feature bitset, seat/node caps.
|
||
|
||
- `kaching-api` validates licenses locally with the **public key**; no call-home needed.
|
||
|
||
- Store customer licenses in Cloud SQL; deliver file via your portal.
|
||
|
||
|
||
---
|
||
|
||
## Migration path from pure on-prem KACHING
|
||
|
||
- Start with this hosted KACHING; if a customer requires **air-gapped**, deploy the same services via **GKE-on-prem** or a **single-VM** bundle (Postgres + NATS + the same services).
|
||
|
||
- Keep the event schema 1:1 so both modes are supported.
|
||
|
||
|
||
---
|
||
|
||
## What to build first (so you can ship)
|
||
|
||
1. **Ingest** (Cloud Run) + **Pub/Sub** + **aggregator** writing to **Cloud SQL** (raw + hourly).
|
||
|
||
2. **Threshold engine** (soft quota → `alert` + `suggestion`) mapped to your three price ladders.
|
||
|
||
3. **API** read endpoints + IAP-protected minimal UI tiles: tier utilisation, recent alerts, suggested upgrade.
|
||
|
||
4. **Daily job** to Parquet export + Looker Studio over BigQuery for internal analytics.
|
||
|
||
|
||
|
||
## TODO
|
||
|
||
- Ingestion gateways: Implement consistent event schema and authenticated ingestion across CHORUS/WHOOSH/RUSTLE/SLURP; add collectors per agent type.
|
||
- Policy enforcement: Apply license/plan/quotas in orchestration paths (WHOOSH) with alerts and upgrade nudges.
|
||
- Rollups to HQ: Optional redacted rollups push with privacy controls; add unit/integration tests.
|
||
- Dashboard completeness: Wire cross-org/deployment/node views to live telemetry with filtering and drill-down.
|