Files
CHORUS/docs/Modules/KACHING.md

1115 lines
33 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
Below is a drop-in, on-prem friendly telemetry + pricing brain for CHORUS. Its opinionated, PostgreSQL-first, offline-capable, and designed to drive upsell _without_ metered compute.
# KACHING: design spec (schema + event flow)
## What it does
- Collects per-job agent metrics (CPU/GPU secs, RAM peak, I/O, context ops).
- Rolls up to hourly/daily org-level usage.
- Compares against **license/tier limits** and **budgets**.
- Emits upgrade suggestions + alerts.
- (Optional) Pushes redacted rollups to HQ for global analytics.
---
## 1) Data model (PostgreSQL)
**Core entities**
- `org` ← a customer (even on single-tenant installs keep this; it future-proofs).
- `deployment` ← an installation of CHORUS under an org.
- `node` ← physical/virtual machine running agents.
- `agent` ← a logical worker (e.g., `SLURP_ingest`, `UCXL_resolver`).
- `job` ← unit of work (has UCXL address).
- `usage_sample` ← raw metrics per job.
- `usage_rollup_*` ← materialized summaries.
- `license`, `pricing_plan`, `quota`, `budget` ← monetisation controls.
- `feature_flag` ← enables premium capabilities.
- `alert`, `suggestion` ← user-facing nudges/notifications.
- `api_key`, `ingest_token` ← auth for push events.
> Partition time-series tables by day; use `pg_partman` or native declarative partitioning.
**DDL (core)**
```sql
-- orgs & deployments
create table org (
org_id uuid primary key,
name text not null,
created_at timestamptz not null default now()
);
create table deployment (
deployment_id uuid primary key,
org_id uuid not null references org(org_id),
name text not null,
timezone text not null default 'UTC',
created_at timestamptz not null default now()
);
-- nodes & agents
create table node (
node_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
hostname text not null,
hw_class text, -- e.g., "i5-12400 + RTX3060"
labels jsonb not null default '{}',
created_at timestamptz not null default now()
);
create table agent (
agent_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
node_id uuid references node(node_id),
agent_type text not null, -- e.g., "SLURP_ingest"
version text,
labels jsonb not null default '{}',
created_at timestamptz not null default now()
);
-- jobs
create table job (
job_id uuid primary key,
agent_id uuid not null references agent(agent_id),
ucxl_addr text not null,
started_at timestamptz not null,
ended_at timestamptz,
status text not null check (status in ('running','succeeded','failed','canceled')),
correlation_id text, -- idempotency key from caller
meta jsonb not null default '{}'
);
-- raw usage (partition by day on observed_at)
create table usage_sample (
sample_id uuid primary key,
job_id uuid not null references job(job_id),
observed_at timestamptz not null,
cpu_seconds numeric(18,6) not null default 0,
gpu_seconds numeric(18,6) not null default 0,
ram_mb_peak numeric(18,3) not null default 0,
disk_io_mb numeric(18,3) not null default 0,
net_in_mb numeric(18,3) not null default 0,
net_out_mb numeric(18,3) not null default 0,
context_reads integer not null default 0,
context_writes integer not null default 0,
context_bytes_read bigint not null default 0,
context_bytes_written bigint not null default 0,
model_name text, -- if any LLM was used (local/cloud)
model_tokens_in bigint default 0,
model_tokens_out bigint default 0,
flags jsonb not null default '{}'
);
-- rollups (hourly & daily)
create table usage_rollup_hourly (
deployment_id uuid not null references deployment(deployment_id),
bucket_start timestamptz not null, -- aligned to hour
agent_type text not null,
cpu_seconds numeric(18,6) not null,
gpu_seconds numeric(18,6) not null,
ram_mb_peak numeric(18,3) not null,
net_in_mb numeric(18,3) not null,
net_out_mb numeric(18,3) not null,
context_reads bigint not null,
context_writes bigint not null,
context_bytes_read bigint not null,
context_bytes_written bigint not null,
model_tokens_in bigint not null,
model_tokens_out bigint not null,
jobs_succeeded bigint not null,
jobs_failed bigint not null,
primary key (deployment_id, bucket_start, agent_type)
);
create table usage_rollup_daily (
deployment_id uuid not null references deployment(deployment_id),
day date not null,
cpu_seconds numeric(18,6) not null,
gpu_seconds numeric(18,6) not null,
context_bytes_written bigint not null,
seats_active integer not null default 0,
nodes_active integer not null default 0,
orchestration_peak_concurrency integer not null default 0,
model_tokens_in bigint not null,
model_tokens_out bigint not null,
primary key (deployment_id, day)
);
-- licensing / pricing
create table pricing_plan (
plan_id text primary key, -- e.g., 'SMB_Pro', 'Mid_Business'
meta jsonb not null -- published plan limits/features
);
create table license (
license_id uuid primary key,
org_id uuid not null references org(org_id),
plan_id text not null references pricing_plan(plan_id),
seats_limit integer,
nodes_limit integer,
features jsonb not null, -- e.g., {"temporal_nav": true, "federation": false}
valid_from date not null,
valid_to date not null,
signed_blob bytea not null -- vendor-signed license
);
create table quota (
quota_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
name text not null, -- e.g., 'context_bytes', 'temporal_queries'
period text not null, -- 'daily','monthly','rolling_30d'
hard_limit bigint, -- null => unlimited in plan
soft_threshold bigint, -- trigger suggestion at e.g. 80%
created_at timestamptz not null default now()
);
create table budget (
budget_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
scope text not null, -- 'ingest','reason','orchestration'
period text not null, -- 'daily','weekly','monthly'
limit_units numeric(18,6) not null, -- arbitrary unit (e.g., cpu_seconds)
action text not null, -- 'warn','throttle','block','fallback_model'
created_at timestamptz not null default now()
);
-- alerts & suggestions
create table alert (
alert_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
created_at timestamptz not null default now(),
severity text not null check (severity in ('info','warn','error')),
code text not null, -- e.g., 'QUOTA_NEARING'
message text not null,
context jsonb not null default '{}',
acknowledged boolean not null default false
);
create table suggestion (
suggestion_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
created_at timestamptz not null default now(),
kind text not null, -- 'upgrade_tier','enable_feature','tune_pipeline'
rationale text not null,
target_plan text, -- suggested plan id
diffs jsonb not null default '{}', -- what they gain quantitatively
shown_to_user boolean not null default false,
accepted boolean
);
-- auth for ingestion
create table ingest_token (
token_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
token_hash bytea not null, -- store only hash
scopes text[] not null, -- ['ingest:usage','ingest:jobs']
created_at timestamptz not null default now(),
expires_at timestamptz
);
-- convenience: daily seat/node activity
create materialized view mv_daily_activity as
select
d.deployment_id,
date_trunc('day', j.started_at) as day,
count(distinct a.agent_id) filter (where j.status='succeeded') as agents_active,
count(distinct a.node_id) as nodes_active
from job j
join agent a on a.agent_id = j.agent_id
join deployment d on d.deployment_id = a.deployment_id
group by 1,2;
```
Indexes youll want: `(job.agent_id)`, `(usage_sample.job_id)`, `(usage_sample.observed_at)`, `(usage_rollup_hourly.bucket_start)`, `(alert.deployment_id, created_at desc)`, `(quota.deployment_id, name)`.
---
## 2) Telemetry event flow
**On each agent run (end-of-job or periodic heartbeat):**
1. Agent emits a **signed** JSON payload to the local KACHING Ingest API.
2. KACHING validates token + signature, dedups via `correlation_id`.
3. Persist raw `job` + `usage_sample`.
4. Stream to **Aggregator** worker (local queue: Redis Streams/NATS JetStream).
5. Aggregator updates hourly/daily rollups and checks **quota/budget**.
6. If thresholds breached → create `alert` + `suggestion`.
7. (Optional) Post redacted rollups to HQ (batch, e.g., every 6h) over mTLS.
**Event schema (agent → ingest)**
```json
{
"schema": "kaching.v1.usage",
"deployment_id": "6d0b1bcb-...-9c9d",
"agent": {
"agent_id": "0f32...ab",
"agent_type": "SLURP_ingest",
"version": "1.7.3"
},
"node": {
"node_id": "4aa0...de",
"hostname": "raven01",
"hw_class": "Ryzen 5600 + RTX 3060 12G"
},
"job": {
"job_id": "a3e8...90",
"correlation_id": "ucxl:alpha:2025-08-14T08:03:05Z:42",
"ucxl_addr": "ucxl://any:finance@project:alpha/*/report.md",
"started_at": "2025-08-14T08:03:00Z",
"ended_at": "2025-08-14T08:03:05Z",
"status": "succeeded",
"meta": { "workflow": "SLURP", "source": "Confluence" }
},
"metrics": {
"cpu_seconds": 48.5,
"gpu_seconds": 3.1,
"ram_mb_peak": 520,
"disk_io_mb": 18.2,
"net_in_mb": 0.5,
"net_out_mb": 1.2,
"context_reads": 114,
"context_writes": 7,
"context_bytes_read": 812345,
"context_bytes_written": 1280033
},
"llm": {
"model_name": "llama3.1:8b-q4",
"tokens_in": 1820,
"tokens_out": 740,
"provider": "local"
},
"sig": {
"algo": "ed25519",
"key_id": "agentkey-02",
"signature": "<base64>"
}
}
```
**Idempotency rules**
- `job.correlation_id` required; duplicates are `UPSERT`ed (last one wins if timestamps advance).
- Clock skew tolerated: if `ended_at < started_at` by small delta, accept and flag.
**Security**
- mTLS between agents and KACHING.
- Per-deployment `ingest_token` with narrow scopes.
- Payload signature checked against registered `key_id` (rotate quarterly).
- PII-free by default; redact `ucxl_addr` segments via local policy if needed.
---
## 3) Aggregation & thresholds
**Rollup jobs**
- Hourly: group by `deployment_id, agent_type, bucket_start`.
- Daily: group by `deployment_id, day`.
- Maintain **peak orchestration concurrency** using job overlap counts (interval tree or `btree_gist` on `(started_at, ended_at)`).
**Threshold engine**
- Load effective limits from `license.features` + `quota`.
- Example checks:
- Soft: `context_bytes_written (30d)` ≥ 80% of limit → `QUOTA_NEARING`.
- Hard: `nodes_active` > `license.nodes_limit``HARD_BLOCK` (if policy says).
- Budget: `gpu_seconds (daily)` > budget → emit `BUDGET_EXCEEDED` with policy action (`fallback_model` etc).
**Suggestion generator**
- Map overages to plan ladder deltas (from your published `pricing_plan.meta`):
- “Upgrade Pro → Business: +3× context, +multi-site federation”
- Compute **quantitative deltas** (e.g., “your 30-day context is 1.8× current cap; Business raises cap to 3.0×”).
---
## 4) Example queries that drive the UI/upsell
**A) Soft quota nearing (30-day rolling)**
```sql
with windowed as (
select deployment_id,
sum(context_bytes_written) over (
partition by deployment_id
order by day
rows between 29 preceding and current row
) as bytes_30d,
day
from usage_rollup_daily
)
select w.deployment_id, w.day, w.bytes_30d, q.soft_threshold, q.hard_limit
from windowed w
join quota q on q.deployment_id = w.deployment_id and q.name='context_bytes' and q.period='rolling_30d'
where w.bytes_30d >= q.soft_threshold;
```
**B) Peak concurrency**
```sql
-- approximate: count overlapping running jobs at minute resolution
select deployment_id, bucket_minute, max(concurrency) as peak
from (
select a.deployment_id,
date_trunc('minute', g.bucket) as bucket_minute,
count(*) as concurrency
from job j
join agent a on a.agent_id = j.agent_id
join generate_series(j.started_at, j.ended_at, interval '1 minute') as g(bucket) on true
where j.status in ('running','succeeded')
group by 1,2
) t
group by 1,2
order by peak desc
limit 1;
```
**C) “Who to upsell this week?”**
```sql
select d.deployment_id,
sum(case when name='context_bytes' then 1 else 0 end) as context_near,
sum(case when name='temporal_queries' then 1 else 0 end) as temporal_near
from alert a
join deployment d on d.deployment_id=a.deployment_id
where a.code='QUOTA_NEARING' and a.created_at > now() - interval '7 days'
group by 1
order by (context_near + temporal_near) desc
limit 25;
```
---
## 5) API surfaces (local only by default)
**Ingest**
- `POST /v1/ingest/usage` : accepts `kaching.v1.usage` (above)
- `POST /v1/ingest/job-status` : minimal heartbeat/status updates
**Dashboards**
- `GET /v1/usage/daily?deployment_id=...&from=...&to=...`
- `GET /v1/limits/effective?deployment_id=...` (license + quotas merged)
- `GET /v1/alerts?deployment_id=...`
- `GET /v1/suggestions?deployment_id=...`
**Admin**
- `POST /v1/quota` (create/update)
- `POST /v1/budget`
- `POST /v1/license/activate` (uploads vendor-signed blob)
- `POST /v1/tokens` (issue/rotate ingest tokens)
Auth: local mTLS + `Authorization: Bearer <token>`; all responses cacheable 60s.
---
## 6) Deployment architecture (on-prem first)
- **KACHING Core** (Go/Rust service)
- HTTP ingest + API
- Aggregator worker
- Scheduler (rollups, alerts)
- Optional “HQ Sync” module (disabled by default)
- **State**
- PostgreSQL 15+ (enable `pg_partman` / native range partitioning)
- Redis/NATS for event buffer (optional; fall back to `COPY`-on-commit if absent)
- **Packaging**
- Systemd unit or Docker Compose
- Helm chart for k8s clusters (nodeSelector for DB locality)
- **Resilience**
- Backpressure: agents buffer to disk (bounded queue) if ingest is unavailable
- Idempotent writes via `correlation_id`
- Daily VACUUM/ANALYZE; weekly `REFRESH MATERIALIZED VIEW mv_daily_activity`
---
## 7) Retention & performance
- **Raw `usage_sample`**: 1430 days hot; downsample into rollups; archive to parquet on local object storage (MinIO) monthly.
- **Rollups**: keep 24 months.
- Partition keys: `usage_sample.observed_at` (daily), `usage_rollup_hourly.bucket_start` (daily).
- Use `BRIN` indexes for time partitions; `btree` for FK lookups.
---
## 8) License enforcement (light-touch)
- **Signed license** (`license.signed_blob`) includes: plan id, seat/node caps, expiry, feature bitset, signature.
- Enforce **soft** by default (warn + suggest), **hard** only when explicitly configured (enterprise asks).
- Local-only check; never phones home unless HQ Sync is enabled.
**Prove governance, not just spend.** KACHING rolls up per-job context ops, model tokens, and concurrency into org-level signals, compares against **quotas/budgets**, and emits policy actions (warn/throttle/block/fallback). Thiss how we enforce _runtime_ guardrails in a way boards and auditors can verify.
---
## 9) Budget actions (runtime knobs CHORUS can use)
When KACHING emits `BUDGET_EXCEEDED` with `action`:
- `warn`: post in CHORUS banner.
- `throttle`: cap orchestration concurrency N.
- `block`: reject new jobs in that scope.
- `fallback_model`: instruct agent to switch to cheaper local model; pass a `policy_decision` payload back via CHORUS control channel.
---
## 10) UX hooks (what users see)
- **Tier Utilization card**: seats, nodes, temporal queries, context volume; sparkline + % of cap.
- **Bottleneck callouts**: “Temporal queries blocked 3× this week.”
- **Clear upgrade CTA**: shows _concrete deltas_ (“+3× context window, +multi-site federation”).
---
## 11) Minimal agent SDK shim
**Config (YAML)**
```yaml
kaching:
ingest_url: https://kaching.local/v1/ingest/usage
token: ${KACHING_TOKEN}
key_id: agentkey-02
key_path: /etc/chorus/agent_ed25519
flush_interval_ms: 2000
max_batch: 200
dedupe_key: correlation_id
redact:
ucxl_addr_segments: ["credentials", "secrets"]
```
**Agent integration (pseudo-code)**
```python
with Job("ucxl://.../report.md") as job:
# ... do work ...
kaching.emit_usage(job_id=job.id,
cpu_seconds=cpu,
gpu_seconds=gpu,
context_reads=reads,
context_writes=writes,
model_name=model,
tokens_in=tin, tokens_out=tout)
```
---
## 12) What this unlocks for pricing
- Flat annual licenses remain clean **because** you can prove usage & growth.
- Automated, evidence-based upsells (“youre consistently at 8895% of context cap; Business tier gives +3× headroom and federation you already need”).
- If you _ever_ add paid add-ons (e.g., optional hosted LLM fallback), the hooks (model tokens, provider) are already in the schema.
---
## 13) Implementation order (2-week sprint)
1. DDL + partitioning + ingest auth (ed25519, token hash).
2. Ingest API + idempotent writes + agent shim.
3. Hourly/daily rollups + three alerts (`QUOTA_NEARING`, `BUDGET_EXCEEDED`, `NODES_LIMIT_EXCEEDED`).
4. Suggestion generator v1 (map to your three ladders).
5. Minimal web UI cards (tier utilization, alerts, CTA).
6. Optional: HQ Sync (batched, redacted).
---
**KACHING on Google Cloud** a tight, secure, hybrid telemetry platform that your **on-prem CHORUS** installs can push into. Below is a practical blueprint you can build from today: services, IAM, network, data path, OpenAPI, and a Go reference skeleton.
# [[KACHING]] on GCP: production architecture
## Core services (minimal, proven stack)
- **Cloud Run (fully managed)**
- `kaching-ingest` (public endpoint, locked by mutual auth + signed payloads)
- `kaching-api` (dashboards/admin; private behind IAP)
- `kaching-aggregator` (background rollups; also run as **Cloud Run jobs**)
- **Pub/Sub**
- `usage-events` (raw events)
- `dead-letter-usage` (DLQ with retry policy)
- **Cloud SQL for PostgreSQL 15+** (primary persistence; point-in-time recovery enabled)
- **Cloud Storage**
- `kaching-archive` (monthly Parquet archives; optional customer exports)
- **BigQuery**
- `kaching_analytics` dataset (hourly/daily rollups mirrored for analytics & Looker)
- **Secret Manager** (ingest tokens, per-org agent public keys, DB creds)
- **Cloud KMS** (CMEK for Cloud SQL, GCS, Pub/Sub; license signing keys)
- **Cloud Scheduler + Cloud Run jobs** (hourly/daily rollup + archive tasks)
- **Cloud Logging + Cloud Monitoring** (SLIs/SLOs; alerting policies)
- **Cloud Armor** (WAF in front of HTTPS LB to Cloud Run)
- **Identity-Aware Proxy (IAP)** (protect `kaching-api` admin UI)
- **VPC + Serverless VPC Access** (Cloud Run ↔ Cloud SQL private IP)
> Optional (later): **Apigee** if you need enterprise API governance; **Dataflow** if rollups outgrow SQL.
---
## Data flow (event path)
Telemetry is **signed + idempotent** (ed25519, correlation IDs) and multi-tenant isolated (RLS/KMS/CMEK); redacted rollups can sync to HQ for cross-deployment analytics.
1. **On-prem agent** completes a job → emits **signed** `kaching.v1.usage` JSON over HTTPS to `kaching-ingest`.
- Auth: `Authorization: Bearer <ingest_token>` (hash validated)
- Integrity: detached **Ed25519** signature over canonical JSON (agent key registered per org)
- Idempotency: `job.correlation_id`
2. `kaching-ingest` performs **fast stateless checks** → pushes message to **Pub/Sub `usage-events`** (attributes include `org_id`, `deployment_id`, `schema_version`, `event_ts`).
3. `kaching-aggregator` (Pub/Sub push/pull)
- Writes **raw** to `usage_sample`/`job` in **Cloud SQL** (in txn)
- Maintains **hourly/daily** rollups (SQL upserts)
- Evaluates **quota/budget** → inserts `alert`/`suggestion`
- Mirrors fresh rollups to **BigQuery** (streaming inserts or 5-min batch)
4. **Cloud Run job** (hourly, daily)
- VACUUM/ANALYZE hot partitions
- Export prior month raw to **GCS Parquet**
- Advance materialized views
5. `kaching-api` serves dashboards (IAP-protected) + admin endpoints.
---
## Multi-tenancy & data isolation
- All rows keyed by `org_id` + `deployment_id`; enforce **RLS (Row Level Security)** in PostgreSQL for any shared read paths.
- Each org has a **KMS-wrapped secret** namespace (ingest tokens, agent public keys).
- CMEK on Cloud SQL & GCS; per-org key rings if you need tenant keying.
---
## Security model (pragmatic & strong)
- **Ingress**: HTTPS LB → Cloud Armor → Cloud Run (`kaching-ingest`).
- **mTLS (optional)**: If you control agent certs, terminate mTLS at the LB; otherwise rely on:
- Bearer **ingest token** (DB-stored bcrypt/argon2 hash)
- **Ed25519** signature (payload integrity, replay window ≤ 5 min)
- **Idempotency** via `(org_id, correlation_id)` unique index
- **Secrets**: only via Secret Manager; short-lived Cloud SQL IAM tokens (no static passwords in code).
- **Least-privilege IAM** service accounts (see below).
---
## IAM layout (service accounts & roles)
- `sa-kaching-ingest`
- `roles/pubsub.publisher` (to `usage-events`)
- `roles/secretmanager.secretAccessor` (read token pepper + org pubkeys)
- `sa-kaching-aggregator`
- `roles/cloudsql.client`, `roles/secretmanager.secretAccessor`
- `roles/pubsub.subscriber` (from `usage-events`)
- `roles/storage.objectAdmin` (to write Parquet archives)
- `roles/bigquery.dataEditor` (analytics dataset)
- `sa-kaching-api`
- `roles/cloudsql.client`, `roles/secretmanager.secretAccessor`
- Protected by **IAP**; org admins authenticated via Google Identity
- `sa-kaching-scheduler`
- Invokes Cloud Run jobs; minimal runner roles
- KMS: grant each SA `cryptoKeyEncrypterDecrypter` on CMEK keys.
---
## Database schema
Use the schema we defined earlier (orgs, deployments, nodes, agents, jobs, usage_sample, rollups, license, quota, budget, alert, suggestion, tokens, etc.) **unchanged**—it fits Cloud SQL.
Add:
```sql
-- RLS example (read-only API role sees only its org)
alter table usage_rollup_daily enable row level security;
create policy org_isolation on usage_rollup_daily
using (org_id = current_setting('app.current_org')::uuid);
```
Your API sets `set app.current_org = '<org-uuid>';` after auth.
---
## OpenAPI (ingest + suggestions)
```yaml
openapi: 3.0.3
info:
title: KACHING API
version: 1.0.0
servers:
- url: https://ingest.kaching.yourdomain.com
paths:
/v1/ingest/usage:
post:
security: [{ IngestToken: [] }]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/KachingUsageV1'
responses:
'202': { description: accepted }
'400': { description: bad request }
'401': { description: unauthorized }
'409': { description: duplicate (idempotent) }
/v1/suggestions:
get:
security: [{ IAPAuth: [] }]
parameters:
- in: query; name: deployment_id; required: true; schema: { type: string, format: uuid }
responses:
'200':
description: list
content:
application/json:
schema:
type: array
items: { $ref: '#/components/schemas/Suggestion' }
components:
securitySchemes:
IngestToken:
type: http
scheme: bearer
IAPAuth:
type: http
scheme: bearer
schemas:
KachingUsageV1:
type: object
required: [schema, deployment_id, agent, node, job, metrics, sig]
properties:
schema: { type: string, enum: ["kaching.v1.usage"] }
deployment_id: { type: string, format: uuid }
agent: { type: object, required: [agent_id, agent_type] }
node: { type: object, required: [node_id, hostname] }
job: { type: object, required: [job_id, ucxl_addr, started_at, status] }
metrics: { type: object }
llm: { type: object }
sig:
type: object
required: [algo, key_id, signature]
properties:
algo: { type: string, enum: ["ed25519"] }
key_id: { type: string }
signature: { type: string } # base64
Suggestion:
type: object
properties:
suggestion_id: { type: string, format: uuid }
kind: { type: string }
rationale: { type: string }
target_plan: { type: string }
diffs: { type: object }
```
---
## Go reference: Cloud Run **ingest** handler (concise skeleton)
```go
package main
import (
"context"
"crypto/ed25519"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/pubsub"
)
type Usage struct {
Schema string `json:"schema"`
DeploymentID string `json:"deployment_id"`
Agent struct {
AgentID string `json:"agent_id"`
AgentType string `json:"agent_type"`
Version string `json:"version"`
} `json:"agent"`
Node struct {
NodeID string `json:"node_id"`
Hostname string `json:"hostname"`
HwClass string `json:"hw_class"`
} `json:"node"`
Job struct {
JobID string `json:"job_id"`
CorrelationID string `json:"correlation_id"`
UCXLAddr string `json:"ucxl_addr"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
Status string `json:"status"`
Meta any `json:"meta"`
} `json:"job"`
Metrics any `json:"metrics"`
LLM any `json:"llm"`
Sig struct {
Algo string `json:"algo"`
KeyID string `json:"key_id"`
Signature string `json:"signature"`
} `json:"sig"`
}
var (
topic *pubsub.Topic
)
func main() {
ctx := context.Background()
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
topicName := os.Getenv("PUBSUB_TOPIC") // usage-events
client, err := pubsub.NewClient(ctx, projectID)
if err != nil { log.Fatal(err) }
topic = client.Topic(topicName)
http.HandleFunc("/v1/ingest/usage", handleIngest)
log.Fatal(http.ListenAndServe(":8080", nil))
}
func handleIngest(w http.ResponseWriter, r *http.Request) {
// 1) Auth: bearer token
tok := r.Header.Get("Authorization")
if !validateIngestToken(tok) {
http.Error(w, "unauthorized", http.StatusUnauthorized); return
}
// 2) Parse
var u Usage
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
if err := dec.Decode(&u); err != nil {
http.Error(w, "bad json", http.StatusBadRequest); return
}
if u.Schema != "kaching.v1.usage" {
http.Error(w, "bad schema", http.StatusBadRequest); return
}
// 3) Verify signature over canonical JSON (client must sign without "sig")
// For brevity assume client also sends "X-Payload-Hash" header we check here.
if !verifySignature(r.Context(), u) {
http.Error(w, "bad signature", http.StatusUnauthorized); return
}
// 4) Publish to Pub/Sub with ordering key = deployment_id
b, _ := json.Marshal(u)
res := topic.Publish(r.Context(), &pubsub.Message{
Data: b,
OrderingKey: u.DeploymentID,
Attributes: map[string]string{
"deployment_id": u.DeploymentID,
"agent_type": u.Agent.AgentType,
"schema": u.Schema,
},
})
if _, err := res.Get(r.Context()); err != nil {
http.Error(w, "queue error", http.StatusServiceUnavailable); return
}
w.WriteHeader(http.StatusAccepted)
}
func validateIngestToken(hdr string) bool {
// Expect "Bearer abc"
// Lookup hashed value in Secret Manager or Redis; constant-time compare
want := os.Getenv("INGEST_TOKEN_HASH") // e.g., sha256 hex
got := extractBearer(hdr)
sum := sha256Hex(got)
return subtle.ConstantTimeCompare([]byte(sum), []byte(want)) == 1
}
func verifySignature(ctx context.Context, u Usage) bool {
// Lookup org/deployment → key_id → ed25519 public key (Secret Manager)
pub := fetchEd25519Pub(ctx, u.Sig.KeyID)
sigBytes, _ := base64.StdEncoding.DecodeString(u.Sig.Signature)
// canonicalize payload without Sig; omitted here for brevity
payload := canonicalJSON(u, /*excludeSig=*/true)
return ed25519.Verify(pub, payload, sigBytes)
}
```
> Notes
> • Enable **ordered delivery** on Pub/Sub if you need strict per-deployment ordering.
> • For signature canonicalization, fix field order and whitespace (e.g., RFC 8785 JSON Canonicalization Scheme).
---
## Rollups & thresholds (Cloud Run job)
- **Hourly job**:
- UPSERT into `usage_rollup_hourly` grouped by `(deployment_id, bucket_start, agent_type)`
- Evaluate **soft/hard quotas**; write `alert`/`suggestion`
- Stream hourly snapshot into **BigQuery** (for Looker dashboards)
- **Daily job**:
- UPSERT `usage_rollup_daily`
- Export aging `usage_sample` partitions → **GCS Parquet**
- `VACUUM/ANALYZE` current partitions
- Refresh materialized views
Run both with **Cloud Scheduler****Cloud Run jobs** (retries on failure).
---
## Networking & regions
- Put everything in one **primary region** close to your customers (e.g., `australia-southeast1` for you).
- Turn on **PITR** for Cloud SQL; add read replica in nearby region if needed.
- If you require **egress locality** for data sovereignty, pin **GCS** and **BigQuery** datasets to the same region and enforce org-scoped KMS keys.
---
## Observability & SLOs
- **SLI ideas**:
- Ingest success rate ≥ 99.9% (5-min windows)
- Pub/Sub → DB end-to-end latency p95 < 2 min
- Aggregator error rate < 0.1%
- **Dashboards** (Cloud Monitoring):
- Requests by status, Pub/Sub undelivered messages, Cloud Run CPU/mem, Cloud SQL CPU/IO wait
- **Alerts**:
- Pub/Sub backlog > threshold 10 min
- Cloud Run 5xx > 1% for 5 min
- SQL connection errors burst
---
## Terraform pointers (resource sketch)
```hcl
resource "google_pubsub_topic" "usage" {
name = "usage-events"
message_retention_duration = "604800s"
kms_key_name = google_kms_crypto_key.pubsub.id
}
resource "google_sql_database_instance" "pg" {
name = "kaching-pg"
database_version = "POSTGRES_15"
region = "australia-southeast1"
settings {
tier = "db-custom-2-7680" # pick to taste
backup_configuration { point_in_time_recovery_enabled = true }
ip_configuration { private_network = google_compute_network.vpc.id }
encryption_key_name = google_kms_crypto_key.sql.id
}
}
resource "google_cloud_run_service" "ingest" {
name = "kaching-ingest"
location = "australia-southeast1"
template {
spec {
containers {
image = "gcr.io/PROJECT/kaching-ingest:latest"
env { name = "PUBSUB_TOPIC" value = google_pubsub_topic.usage.name }
}
service_account_name = google_service_account.ingest.email
}
}
}
```
---
## License issuance on GCP
- Keep a **vendor signing key** in **Cloud KMS**; sign a compact license JWT or binary blob containing: `org_id`, `plan_id`, `valid_from/to`, feature bitset, seat/node caps.
- `kaching-api` validates licenses locally with the **public key**; no call-home needed.
- Store customer licenses in Cloud SQL; deliver file via your portal.
---
## Migration path from pure on-prem KACHING
- Start with this hosted KACHING; if a customer requires **air-gapped**, deploy the same services via **GKE-on-prem** or a **single-VM** bundle (Postgres + NATS + the same services).
- Keep the event schema 1:1 so both modes are supported.
---
## What to build first (so you can ship)
1. **Ingest** (Cloud Run) + **Pub/Sub** + **aggregator** writing to **Cloud SQL** (raw + hourly).
2. **Threshold engine** (soft quota → `alert` + `suggestion`) mapped to your three price ladders.
3. **API** read endpoints + IAP-protected minimal UI tiles: tier utilisation, recent alerts, suggested upgrade.
4. **Daily job** to Parquet export + Looker Studio over BigQuery for internal analytics.
## TODO
- Ingestion gateways: Implement consistent event schema and authenticated ingestion across CHORUS/WHOOSH/RUSTLE/SLURP; add collectors per agent type.
- Policy enforcement: Apply license/plan/quotas in orchestration paths (WHOOSH) with alerts and upgrade nudges.
- Rollups to HQ: Optional redacted rollups push with privacy controls; add unit/integration tests.
- Dashboard completeness: Wire cross-org/deployment/node views to live telemetry with filtering and drill-down.