33 KiB
Below is a drop-in, on-prem friendly telemetry + pricing brain for CHORUS. It’s opinionated, PostgreSQL-first, offline-capable, and designed to drive upsell without metered compute.
KACHING: design spec (schema + event flow)
What it does
-
Collects per-job agent metrics (CPU/GPU secs, RAM peak, I/O, context ops).
-
Rolls up to hourly/daily org-level usage.
-
Compares against license/tier limits and budgets.
-
Emits upgrade suggestions + alerts.
-
(Optional) Pushes redacted rollups to HQ for global analytics.
1) Data model (PostgreSQL)
Core entities
-
org← a customer (even on single-tenant installs keep this; it future-proofs). -
deployment← an installation of CHORUS under an org. -
node← physical/virtual machine running agents. -
agent← a logical worker (e.g.,SLURP_ingest,UCXL_resolver). -
job← unit of work (has UCXL address). -
usage_sample← raw metrics per job. -
usage_rollup_*← materialized summaries. -
license,pricing_plan,quota,budget← monetisation controls. -
feature_flag← enables premium capabilities. -
alert,suggestion← user-facing nudges/notifications. -
api_key,ingest_token← auth for push events.
Partition time-series tables by day; use
pg_partmanor native declarative partitioning.
DDL (core)
-- orgs & deployments
create table org (
org_id uuid primary key,
name text not null,
created_at timestamptz not null default now()
);
create table deployment (
deployment_id uuid primary key,
org_id uuid not null references org(org_id),
name text not null,
timezone text not null default 'UTC',
created_at timestamptz not null default now()
);
-- nodes & agents
create table node (
node_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
hostname text not null,
hw_class text, -- e.g., "i5-12400 + RTX3060"
labels jsonb not null default '{}',
created_at timestamptz not null default now()
);
create table agent (
agent_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
node_id uuid references node(node_id),
agent_type text not null, -- e.g., "SLURP_ingest"
version text,
labels jsonb not null default '{}',
created_at timestamptz not null default now()
);
-- jobs
create table job (
job_id uuid primary key,
agent_id uuid not null references agent(agent_id),
ucxl_addr text not null,
started_at timestamptz not null,
ended_at timestamptz,
status text not null check (status in ('running','succeeded','failed','canceled')),
correlation_id text, -- idempotency key from caller
meta jsonb not null default '{}'
);
-- raw usage (partition by day on observed_at)
create table usage_sample (
sample_id uuid primary key,
job_id uuid not null references job(job_id),
observed_at timestamptz not null,
cpu_seconds numeric(18,6) not null default 0,
gpu_seconds numeric(18,6) not null default 0,
ram_mb_peak numeric(18,3) not null default 0,
disk_io_mb numeric(18,3) not null default 0,
net_in_mb numeric(18,3) not null default 0,
net_out_mb numeric(18,3) not null default 0,
context_reads integer not null default 0,
context_writes integer not null default 0,
context_bytes_read bigint not null default 0,
context_bytes_written bigint not null default 0,
model_name text, -- if any LLM was used (local/cloud)
model_tokens_in bigint default 0,
model_tokens_out bigint default 0,
flags jsonb not null default '{}'
);
-- rollups (hourly & daily)
create table usage_rollup_hourly (
deployment_id uuid not null references deployment(deployment_id),
bucket_start timestamptz not null, -- aligned to hour
agent_type text not null,
cpu_seconds numeric(18,6) not null,
gpu_seconds numeric(18,6) not null,
ram_mb_peak numeric(18,3) not null,
net_in_mb numeric(18,3) not null,
net_out_mb numeric(18,3) not null,
context_reads bigint not null,
context_writes bigint not null,
context_bytes_read bigint not null,
context_bytes_written bigint not null,
model_tokens_in bigint not null,
model_tokens_out bigint not null,
jobs_succeeded bigint not null,
jobs_failed bigint not null,
primary key (deployment_id, bucket_start, agent_type)
);
create table usage_rollup_daily (
deployment_id uuid not null references deployment(deployment_id),
day date not null,
cpu_seconds numeric(18,6) not null,
gpu_seconds numeric(18,6) not null,
context_bytes_written bigint not null,
seats_active integer not null default 0,
nodes_active integer not null default 0,
orchestration_peak_concurrency integer not null default 0,
model_tokens_in bigint not null,
model_tokens_out bigint not null,
primary key (deployment_id, day)
);
-- licensing / pricing
create table pricing_plan (
plan_id text primary key, -- e.g., 'SMB_Pro', 'Mid_Business'
meta jsonb not null -- published plan limits/features
);
create table license (
license_id uuid primary key,
org_id uuid not null references org(org_id),
plan_id text not null references pricing_plan(plan_id),
seats_limit integer,
nodes_limit integer,
features jsonb not null, -- e.g., {"temporal_nav": true, "federation": false}
valid_from date not null,
valid_to date not null,
signed_blob bytea not null -- vendor-signed license
);
create table quota (
quota_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
name text not null, -- e.g., 'context_bytes', 'temporal_queries'
period text not null, -- 'daily','monthly','rolling_30d'
hard_limit bigint, -- null => unlimited in plan
soft_threshold bigint, -- trigger suggestion at e.g. 80%
created_at timestamptz not null default now()
);
create table budget (
budget_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
scope text not null, -- 'ingest','reason','orchestration'
period text not null, -- 'daily','weekly','monthly'
limit_units numeric(18,6) not null, -- arbitrary unit (e.g., cpu_seconds)
action text not null, -- 'warn','throttle','block','fallback_model'
created_at timestamptz not null default now()
);
-- alerts & suggestions
create table alert (
alert_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
created_at timestamptz not null default now(),
severity text not null check (severity in ('info','warn','error')),
code text not null, -- e.g., 'QUOTA_NEARING'
message text not null,
context jsonb not null default '{}',
acknowledged boolean not null default false
);
create table suggestion (
suggestion_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
created_at timestamptz not null default now(),
kind text not null, -- 'upgrade_tier','enable_feature','tune_pipeline'
rationale text not null,
target_plan text, -- suggested plan id
diffs jsonb not null default '{}', -- what they gain quantitatively
shown_to_user boolean not null default false,
accepted boolean
);
-- auth for ingestion
create table ingest_token (
token_id uuid primary key,
deployment_id uuid not null references deployment(deployment_id),
token_hash bytea not null, -- store only hash
scopes text[] not null, -- ['ingest:usage','ingest:jobs']
created_at timestamptz not null default now(),
expires_at timestamptz
);
-- convenience: daily seat/node activity
create materialized view mv_daily_activity as
select
d.deployment_id,
date_trunc('day', j.started_at) as day,
count(distinct a.agent_id) filter (where j.status='succeeded') as agents_active,
count(distinct a.node_id) as nodes_active
from job j
join agent a on a.agent_id = j.agent_id
join deployment d on d.deployment_id = a.deployment_id
group by 1,2;
Indexes you’ll want: (job.agent_id), (usage_sample.job_id), (usage_sample.observed_at), (usage_rollup_hourly.bucket_start), (alert.deployment_id, created_at desc), (quota.deployment_id, name).
2) Telemetry event flow
On each agent run (end-of-job or periodic heartbeat):
-
Agent emits a signed JSON payload to the local KACHING Ingest API.
-
KACHING validates token + signature, dedups via
correlation_id. -
Persist raw
job+usage_sample. -
Stream to Aggregator worker (local queue: Redis Streams/NATS JetStream).
-
Aggregator updates hourly/daily rollups and checks quota/budget.
-
If thresholds breached → create
alert+suggestion. -
(Optional) Post redacted rollups to HQ (batch, e.g., every 6h) over mTLS.
Event schema (agent → ingest)
{
"schema": "kaching.v1.usage",
"deployment_id": "6d0b1bcb-...-9c9d",
"agent": {
"agent_id": "0f32...ab",
"agent_type": "SLURP_ingest",
"version": "1.7.3"
},
"node": {
"node_id": "4aa0...de",
"hostname": "raven01",
"hw_class": "Ryzen 5600 + RTX 3060 12G"
},
"job": {
"job_id": "a3e8...90",
"correlation_id": "ucxl:alpha:2025-08-14T08:03:05Z:42",
"ucxl_addr": "ucxl://any:finance@project:alpha/*/report.md",
"started_at": "2025-08-14T08:03:00Z",
"ended_at": "2025-08-14T08:03:05Z",
"status": "succeeded",
"meta": { "workflow": "SLURP", "source": "Confluence" }
},
"metrics": {
"cpu_seconds": 48.5,
"gpu_seconds": 3.1,
"ram_mb_peak": 520,
"disk_io_mb": 18.2,
"net_in_mb": 0.5,
"net_out_mb": 1.2,
"context_reads": 114,
"context_writes": 7,
"context_bytes_read": 812345,
"context_bytes_written": 1280033
},
"llm": {
"model_name": "llama3.1:8b-q4",
"tokens_in": 1820,
"tokens_out": 740,
"provider": "local"
},
"sig": {
"algo": "ed25519",
"key_id": "agentkey-02",
"signature": "<base64>"
}
}
Idempotency rules
-
job.correlation_idrequired; duplicates areUPSERTed (last one wins if timestamps advance). -
Clock skew tolerated: if
ended_at < started_atby small delta, accept and flag.
Security
-
mTLS between agents and KACHING.
-
Per-deployment
ingest_tokenwith narrow scopes. -
Payload signature checked against registered
key_id(rotate quarterly). -
PII-free by default; redact
ucxl_addrsegments via local policy if needed.
3) Aggregation & thresholds
Rollup jobs
-
Hourly: group by
deployment_id, agent_type, bucket_start. -
Daily: group by
deployment_id, day. -
Maintain peak orchestration concurrency using job overlap counts (interval tree or
btree_giston(started_at, ended_at)).
Threshold engine
-
Load effective limits from
license.features+quota. -
Example checks:
-
Soft:
context_bytes_written (30d)≥ 80% of limit →QUOTA_NEARING. -
Hard:
nodes_active>license.nodes_limit→HARD_BLOCK(if policy says). -
Budget:
gpu_seconds (daily)> budget → emitBUDGET_EXCEEDEDwith policy action (fallback_modeletc).
-
Suggestion generator
-
Map overages to plan ladder deltas (from your published
pricing_plan.meta):- “Upgrade Pro → Business: +3× context, +multi-site federation”
-
Compute quantitative deltas (e.g., “your 30-day context is 1.8× current cap; Business raises cap to 3.0×”).
4) Example queries that drive the UI/upsell
A) Soft quota nearing (30-day rolling)
with windowed as (
select deployment_id,
sum(context_bytes_written) over (
partition by deployment_id
order by day
rows between 29 preceding and current row
) as bytes_30d,
day
from usage_rollup_daily
)
select w.deployment_id, w.day, w.bytes_30d, q.soft_threshold, q.hard_limit
from windowed w
join quota q on q.deployment_id = w.deployment_id and q.name='context_bytes' and q.period='rolling_30d'
where w.bytes_30d >= q.soft_threshold;
B) Peak concurrency
-- approximate: count overlapping running jobs at minute resolution
select deployment_id, bucket_minute, max(concurrency) as peak
from (
select a.deployment_id,
date_trunc('minute', g.bucket) as bucket_minute,
count(*) as concurrency
from job j
join agent a on a.agent_id = j.agent_id
join generate_series(j.started_at, j.ended_at, interval '1 minute') as g(bucket) on true
where j.status in ('running','succeeded')
group by 1,2
) t
group by 1,2
order by peak desc
limit 1;
C) “Who to upsell this week?”
select d.deployment_id,
sum(case when name='context_bytes' then 1 else 0 end) as context_near,
sum(case when name='temporal_queries' then 1 else 0 end) as temporal_near
from alert a
join deployment d on d.deployment_id=a.deployment_id
where a.code='QUOTA_NEARING' and a.created_at > now() - interval '7 days'
group by 1
order by (context_near + temporal_near) desc
limit 25;
5) API surfaces (local only by default)
Ingest
-
POST /v1/ingest/usage: acceptskaching.v1.usage(above) -
POST /v1/ingest/job-status: minimal heartbeat/status updates
Dashboards
-
GET /v1/usage/daily?deployment_id=...&from=...&to=... -
GET /v1/limits/effective?deployment_id=...(license + quotas merged) -
GET /v1/alerts?deployment_id=... -
GET /v1/suggestions?deployment_id=...
Admin
-
POST /v1/quota(create/update) -
POST /v1/budget -
POST /v1/license/activate(uploads vendor-signed blob) -
POST /v1/tokens(issue/rotate ingest tokens)
Auth: local mTLS + Authorization: Bearer <token>; all responses cacheable 60s.
6) Deployment architecture (on-prem first)
-
KACHING Core (Go/Rust service)
-
HTTP ingest + API
-
Aggregator worker
-
Scheduler (rollups, alerts)
-
Optional “HQ Sync” module (disabled by default)
-
-
State
-
PostgreSQL 15+ (enable
pg_partman/ native range partitioning) -
Redis/NATS for event buffer (optional; fall back to
COPY-on-commit if absent)
-
-
Packaging
-
Systemd unit or Docker Compose
-
Helm chart for k8s clusters (nodeSelector for DB locality)
-
-
Resilience
-
Backpressure: agents buffer to disk (bounded queue) if ingest is unavailable
-
Idempotent writes via
correlation_id -
Daily VACUUM/ANALYZE; weekly
REFRESH MATERIALIZED VIEW mv_daily_activity
-
7) Retention & performance
-
Raw
usage_sample: 14–30 days hot; downsample into rollups; archive to parquet on local object storage (MinIO) monthly. -
Rollups: keep 24 months.
-
Partition keys:
usage_sample.observed_at(daily),usage_rollup_hourly.bucket_start(daily). -
Use
BRINindexes for time partitions;btreefor FK lookups.
8) License enforcement (light-touch)
-
Signed license (
license.signed_blob) includes: plan id, seat/node caps, expiry, feature bitset, signature. -
Enforce soft by default (warn + suggest), hard only when explicitly configured (enterprise asks).
-
Local-only check; never phones home unless HQ Sync is enabled.
Prove governance, not just spend. KACHING rolls up per-job context ops, model tokens, and concurrency into org-level signals, compares against quotas/budgets, and emits policy actions (warn/throttle/block/fallback). This’s how we enforce runtime guardrails in a way boards and auditors can verify.
9) Budget actions (runtime knobs CHORUS can use)
When KACHING emits BUDGET_EXCEEDED with action:
-
warn: post in CHORUS banner. -
throttle: cap orchestration concurrency N. -
block: reject new jobs in that scope. -
fallback_model: instruct agent to switch to cheaper local model; pass apolicy_decisionpayload back via CHORUS control channel.
10) UX hooks (what users see)
-
Tier Utilization card: seats, nodes, temporal queries, context volume; sparkline + % of cap.
-
Bottleneck callouts: “Temporal queries blocked 3× this week.”
-
Clear upgrade CTA: shows concrete deltas (“+3× context window, +multi-site federation”).
11) Minimal agent SDK shim
Config (YAML)
kaching:
ingest_url: https://kaching.local/v1/ingest/usage
token: ${KACHING_TOKEN}
key_id: agentkey-02
key_path: /etc/chorus/agent_ed25519
flush_interval_ms: 2000
max_batch: 200
dedupe_key: correlation_id
redact:
ucxl_addr_segments: ["credentials", "secrets"]
Agent integration (pseudo-code)
with Job("ucxl://.../report.md") as job:
# ... do work ...
kaching.emit_usage(job_id=job.id,
cpu_seconds=cpu,
gpu_seconds=gpu,
context_reads=reads,
context_writes=writes,
model_name=model,
tokens_in=tin, tokens_out=tout)
12) What this unlocks for pricing
-
Flat annual licenses remain clean because you can prove usage & growth.
-
Automated, evidence-based upsells (“you’re consistently at 88–95% of context cap; Business tier gives +3× headroom and federation you already need”).
-
If you ever add paid add-ons (e.g., optional hosted LLM fallback), the hooks (model tokens, provider) are already in the schema.
13) Implementation order (2-week sprint)
-
DDL + partitioning + ingest auth (ed25519, token hash).
-
Ingest API + idempotent writes + agent shim.
-
Hourly/daily rollups + three alerts (
QUOTA_NEARING,BUDGET_EXCEEDED,NODES_LIMIT_EXCEEDED). -
Suggestion generator v1 (map to your three ladders).
-
Minimal web UI cards (tier utilization, alerts, CTA).
-
Optional: HQ Sync (batched, redacted).
KACHING on Google Cloud a tight, secure, hybrid telemetry platform that your on-prem CHORUS installs can push into. Below is a practical blueprint you can build from today: services, IAM, network, data path, OpenAPI, and a Go reference skeleton.
KACHING on GCP: production architecture
Core services (minimal, proven stack)
-
Cloud Run (fully managed)
-
kaching-ingest(public endpoint, locked by mutual auth + signed payloads) -
kaching-api(dashboards/admin; private behind IAP) -
kaching-aggregator(background rollups; also run as Cloud Run jobs)
-
-
Pub/Sub
-
usage-events(raw events) -
dead-letter-usage(DLQ with retry policy)
-
-
Cloud SQL for PostgreSQL 15+ (primary persistence; point-in-time recovery enabled)
-
Cloud Storage
kaching-archive(monthly Parquet archives; optional customer exports)
-
BigQuery
kaching_analyticsdataset (hourly/daily rollups mirrored for analytics & Looker)
-
Secret Manager (ingest tokens, per-org agent public keys, DB creds)
-
Cloud KMS (CMEK for Cloud SQL, GCS, Pub/Sub; license signing keys)
-
Cloud Scheduler + Cloud Run jobs (hourly/daily rollup + archive tasks)
-
Cloud Logging + Cloud Monitoring (SLIs/SLOs; alerting policies)
-
Cloud Armor (WAF in front of HTTPS LB to Cloud Run)
-
Identity-Aware Proxy (IAP) (protect
kaching-apiadmin UI) -
VPC + Serverless VPC Access (Cloud Run ↔ Cloud SQL private IP)
Optional (later): Apigee if you need enterprise API governance; Dataflow if rollups outgrow SQL.
Data flow (event path)
Telemetry is signed + idempotent (ed25519, correlation IDs) and multi-tenant isolated (RLS/KMS/CMEK); redacted rollups can sync to HQ for cross-deployment analytics.
-
On-prem agent completes a job → emits signed
kaching.v1.usageJSON over HTTPS tokaching-ingest.-
Auth:
Authorization: Bearer <ingest_token>(hash validated) -
Integrity: detached Ed25519 signature over canonical JSON (agent key registered per org)
-
Idempotency:
job.correlation_id
-
-
kaching-ingestperforms fast stateless checks → pushes message to Pub/Subusage-events(attributes includeorg_id,deployment_id,schema_version,event_ts). -
kaching-aggregator(Pub/Sub push/pull)-
Writes raw to
usage_sample/jobin Cloud SQL (in txn) -
Maintains hourly/daily rollups (SQL upserts)
-
Evaluates quota/budget → inserts
alert/suggestion -
Mirrors fresh rollups to BigQuery (streaming inserts or 5-min batch)
-
-
Cloud Run job (hourly, daily)
-
VACUUM/ANALYZE hot partitions
-
Export prior month raw to GCS Parquet
-
Advance materialized views
-
-
kaching-apiserves dashboards (IAP-protected) + admin endpoints.
Multi-tenancy & data isolation
-
All rows keyed by
org_id+deployment_id; enforce RLS (Row Level Security) in PostgreSQL for any shared read paths. -
Each org has a KMS-wrapped secret namespace (ingest tokens, agent public keys).
-
CMEK on Cloud SQL & GCS; per-org key rings if you need tenant keying.
Security model (pragmatic & strong)
-
Ingress: HTTPS LB → Cloud Armor → Cloud Run (
kaching-ingest). -
mTLS (optional): If you control agent certs, terminate mTLS at the LB; otherwise rely on:
-
Bearer ingest token (DB-stored bcrypt/argon2 hash)
-
Ed25519 signature (payload integrity, replay window ≤ 5 min)
-
Idempotency via
(org_id, correlation_id)unique index
-
-
Secrets: only via Secret Manager; short-lived Cloud SQL IAM tokens (no static passwords in code).
-
Least-privilege IAM service accounts (see below).
IAM layout (service accounts & roles)
-
sa-kaching-ingest-
roles/pubsub.publisher(tousage-events) -
roles/secretmanager.secretAccessor(read token pepper + org pubkeys)
-
-
sa-kaching-aggregator-
roles/cloudsql.client,roles/secretmanager.secretAccessor -
roles/pubsub.subscriber(fromusage-events) -
roles/storage.objectAdmin(to write Parquet archives) -
roles/bigquery.dataEditor(analytics dataset)
-
-
sa-kaching-api-
roles/cloudsql.client,roles/secretmanager.secretAccessor -
Protected by IAP; org admins authenticated via Google Identity
-
-
sa-kaching-scheduler- Invokes Cloud Run jobs; minimal runner roles
-
KMS: grant each SA
cryptoKeyEncrypterDecrypteron CMEK keys.
Database schema
Use the schema we defined earlier (orgs, deployments, nodes, agents, jobs, usage_sample, rollups, license, quota, budget, alert, suggestion, tokens, etc.) unchanged—it fits Cloud SQL.
Add:
-- RLS example (read-only API role sees only its org)
alter table usage_rollup_daily enable row level security;
create policy org_isolation on usage_rollup_daily
using (org_id = current_setting('app.current_org')::uuid);
Your API sets set app.current_org = '<org-uuid>'; after auth.
OpenAPI (ingest + suggestions)
openapi: 3.0.3
info:
title: KACHING API
version: 1.0.0
servers:
- url: https://ingest.kaching.yourdomain.com
paths:
/v1/ingest/usage:
post:
security: [{ IngestToken: [] }]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/KachingUsageV1'
responses:
'202': { description: accepted }
'400': { description: bad request }
'401': { description: unauthorized }
'409': { description: duplicate (idempotent) }
/v1/suggestions:
get:
security: [{ IAPAuth: [] }]
parameters:
- in: query; name: deployment_id; required: true; schema: { type: string, format: uuid }
responses:
'200':
description: list
content:
application/json:
schema:
type: array
items: { $ref: '#/components/schemas/Suggestion' }
components:
securitySchemes:
IngestToken:
type: http
scheme: bearer
IAPAuth:
type: http
scheme: bearer
schemas:
KachingUsageV1:
type: object
required: [schema, deployment_id, agent, node, job, metrics, sig]
properties:
schema: { type: string, enum: ["kaching.v1.usage"] }
deployment_id: { type: string, format: uuid }
agent: { type: object, required: [agent_id, agent_type] }
node: { type: object, required: [node_id, hostname] }
job: { type: object, required: [job_id, ucxl_addr, started_at, status] }
metrics: { type: object }
llm: { type: object }
sig:
type: object
required: [algo, key_id, signature]
properties:
algo: { type: string, enum: ["ed25519"] }
key_id: { type: string }
signature: { type: string } # base64
Suggestion:
type: object
properties:
suggestion_id: { type: string, format: uuid }
kind: { type: string }
rationale: { type: string }
target_plan: { type: string }
diffs: { type: object }
Go reference: Cloud Run ingest handler (concise skeleton)
package main
import (
"context"
"crypto/ed25519"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/pubsub"
)
type Usage struct {
Schema string `json:"schema"`
DeploymentID string `json:"deployment_id"`
Agent struct {
AgentID string `json:"agent_id"`
AgentType string `json:"agent_type"`
Version string `json:"version"`
} `json:"agent"`
Node struct {
NodeID string `json:"node_id"`
Hostname string `json:"hostname"`
HwClass string `json:"hw_class"`
} `json:"node"`
Job struct {
JobID string `json:"job_id"`
CorrelationID string `json:"correlation_id"`
UCXLAddr string `json:"ucxl_addr"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
Status string `json:"status"`
Meta any `json:"meta"`
} `json:"job"`
Metrics any `json:"metrics"`
LLM any `json:"llm"`
Sig struct {
Algo string `json:"algo"`
KeyID string `json:"key_id"`
Signature string `json:"signature"`
} `json:"sig"`
}
var (
topic *pubsub.Topic
)
func main() {
ctx := context.Background()
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
topicName := os.Getenv("PUBSUB_TOPIC") // usage-events
client, err := pubsub.NewClient(ctx, projectID)
if err != nil { log.Fatal(err) }
topic = client.Topic(topicName)
http.HandleFunc("/v1/ingest/usage", handleIngest)
log.Fatal(http.ListenAndServe(":8080", nil))
}
func handleIngest(w http.ResponseWriter, r *http.Request) {
// 1) Auth: bearer token
tok := r.Header.Get("Authorization")
if !validateIngestToken(tok) {
http.Error(w, "unauthorized", http.StatusUnauthorized); return
}
// 2) Parse
var u Usage
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
if err := dec.Decode(&u); err != nil {
http.Error(w, "bad json", http.StatusBadRequest); return
}
if u.Schema != "kaching.v1.usage" {
http.Error(w, "bad schema", http.StatusBadRequest); return
}
// 3) Verify signature over canonical JSON (client must sign without "sig")
// For brevity assume client also sends "X-Payload-Hash" header we check here.
if !verifySignature(r.Context(), u) {
http.Error(w, "bad signature", http.StatusUnauthorized); return
}
// 4) Publish to Pub/Sub with ordering key = deployment_id
b, _ := json.Marshal(u)
res := topic.Publish(r.Context(), &pubsub.Message{
Data: b,
OrderingKey: u.DeploymentID,
Attributes: map[string]string{
"deployment_id": u.DeploymentID,
"agent_type": u.Agent.AgentType,
"schema": u.Schema,
},
})
if _, err := res.Get(r.Context()); err != nil {
http.Error(w, "queue error", http.StatusServiceUnavailable); return
}
w.WriteHeader(http.StatusAccepted)
}
func validateIngestToken(hdr string) bool {
// Expect "Bearer abc"
// Lookup hashed value in Secret Manager or Redis; constant-time compare
want := os.Getenv("INGEST_TOKEN_HASH") // e.g., sha256 hex
got := extractBearer(hdr)
sum := sha256Hex(got)
return subtle.ConstantTimeCompare([]byte(sum), []byte(want)) == 1
}
func verifySignature(ctx context.Context, u Usage) bool {
// Lookup org/deployment → key_id → ed25519 public key (Secret Manager)
pub := fetchEd25519Pub(ctx, u.Sig.KeyID)
sigBytes, _ := base64.StdEncoding.DecodeString(u.Sig.Signature)
// canonicalize payload without Sig; omitted here for brevity
payload := canonicalJSON(u, /*excludeSig=*/true)
return ed25519.Verify(pub, payload, sigBytes)
}
Notes
• Enable ordered delivery on Pub/Sub if you need strict per-deployment ordering.
• For signature canonicalization, fix field order and whitespace (e.g., RFC 8785 JSON Canonicalization Scheme).
Rollups & thresholds (Cloud Run job)
-
Hourly job:
-
UPSERT into
usage_rollup_hourlygrouped by(deployment_id, bucket_start, agent_type) -
Evaluate soft/hard quotas; write
alert/suggestion -
Stream hourly snapshot into BigQuery (for Looker dashboards)
-
-
Daily job:
-
UPSERT
usage_rollup_daily -
Export aging
usage_samplepartitions → GCS Parquet -
VACUUM/ANALYZEcurrent partitions -
Refresh materialized views
-
Run both with Cloud Scheduler → Cloud Run jobs (retries on failure).
Networking & regions
-
Put everything in one primary region close to your customers (e.g.,
australia-southeast1for you). -
Turn on PITR for Cloud SQL; add read replica in nearby region if needed.
-
If you require egress locality for data sovereignty, pin GCS and BigQuery datasets to the same region and enforce org-scoped KMS keys.
Observability & SLOs
-
SLI ideas:
-
Ingest success rate ≥ 99.9% (5-min windows)
-
Pub/Sub → DB end-to-end latency p95 < 2 min
-
Aggregator error rate < 0.1%
-
-
Dashboards (Cloud Monitoring):
- Requests by status, Pub/Sub undelivered messages, Cloud Run CPU/mem, Cloud SQL CPU/IO wait
-
Alerts:
-
Pub/Sub backlog > threshold 10 min
-
Cloud Run 5xx > 1% for 5 min
-
SQL connection errors burst
-
Terraform pointers (resource sketch)
resource "google_pubsub_topic" "usage" {
name = "usage-events"
message_retention_duration = "604800s"
kms_key_name = google_kms_crypto_key.pubsub.id
}
resource "google_sql_database_instance" "pg" {
name = "kaching-pg"
database_version = "POSTGRES_15"
region = "australia-southeast1"
settings {
tier = "db-custom-2-7680" # pick to taste
backup_configuration { point_in_time_recovery_enabled = true }
ip_configuration { private_network = google_compute_network.vpc.id }
encryption_key_name = google_kms_crypto_key.sql.id
}
}
resource "google_cloud_run_service" "ingest" {
name = "kaching-ingest"
location = "australia-southeast1"
template {
spec {
containers {
image = "gcr.io/PROJECT/kaching-ingest:latest"
env { name = "PUBSUB_TOPIC" value = google_pubsub_topic.usage.name }
}
service_account_name = google_service_account.ingest.email
}
}
}
License issuance on GCP
-
Keep a vendor signing key in Cloud KMS; sign a compact license JWT or binary blob containing:
org_id,plan_id,valid_from/to, feature bitset, seat/node caps. -
kaching-apivalidates licenses locally with the public key; no call-home needed. -
Store customer licenses in Cloud SQL; deliver file via your portal.
Migration path from pure on-prem KACHING
-
Start with this hosted KACHING; if a customer requires air-gapped, deploy the same services via GKE-on-prem or a single-VM bundle (Postgres + NATS + the same services).
-
Keep the event schema 1:1 so both modes are supported.
What to build first (so you can ship)
-
Ingest (Cloud Run) + Pub/Sub + aggregator writing to Cloud SQL (raw + hourly).
-
Threshold engine (soft quota →
alert+suggestion) mapped to your three price ladders. -
API read endpoints + IAP-protected minimal UI tiles: tier utilisation, recent alerts, suggested upgrade.
-
Daily job to Parquet export + Looker Studio over BigQuery for internal analytics.
TODO
- Ingestion gateways: Implement consistent event schema and authenticated ingestion across CHORUS/WHOOSH/RUSTLE/SLURP; add collectors per agent type.
- Policy enforcement: Apply license/plan/quotas in orchestration paths (WHOOSH) with alerts and upgrade nudges.
- Rollups to HQ: Optional redacted rollups push with privacy controls; add unit/integration tests.
- Dashboard completeness: Wire cross-org/deployment/node views to live telemetry with filtering and drill-down.