Files
CHORUS/docs/Modules/KACHING.md

33 KiB
Raw Permalink Blame History

Below is a drop-in, on-prem friendly telemetry + pricing brain for CHORUS. Its opinionated, PostgreSQL-first, offline-capable, and designed to drive upsell without metered compute.

KACHING: design spec (schema + event flow)

What it does

  • Collects per-job agent metrics (CPU/GPU secs, RAM peak, I/O, context ops).

  • Rolls up to hourly/daily org-level usage.

  • Compares against license/tier limits and budgets.

  • Emits upgrade suggestions + alerts.

  • (Optional) Pushes redacted rollups to HQ for global analytics.


1) Data model (PostgreSQL)

Core entities

  • org ← a customer (even on single-tenant installs keep this; it future-proofs).

  • deployment ← an installation of CHORUS under an org.

  • node ← physical/virtual machine running agents.

  • agent ← a logical worker (e.g., SLURP_ingest, UCXL_resolver).

  • job ← unit of work (has UCXL address).

  • usage_sample ← raw metrics per job.

  • usage_rollup_* ← materialized summaries.

  • license, pricing_plan, quota, budget ← monetisation controls.

  • feature_flag ← enables premium capabilities.

  • alert, suggestion ← user-facing nudges/notifications.

  • api_key, ingest_token ← auth for push events.

Partition time-series tables by day; use pg_partman or native declarative partitioning.

DDL (core)

-- orgs & deployments
create table org (
  org_id uuid primary key,
  name text not null,
  created_at timestamptz not null default now()
);

create table deployment (
  deployment_id uuid primary key,
  org_id uuid not null references org(org_id),
  name text not null,
  timezone text not null default 'UTC',
  created_at timestamptz not null default now()
);

-- nodes & agents
create table node (
  node_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  hostname text not null,
  hw_class text,         -- e.g., "i5-12400 + RTX3060"
  labels jsonb not null default '{}',
  created_at timestamptz not null default now()
);

create table agent (
  agent_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  node_id uuid references node(node_id),
  agent_type text not null,   -- e.g., "SLURP_ingest"
  version text,
  labels jsonb not null default '{}',
  created_at timestamptz not null default now()
);

-- jobs
create table job (
  job_id uuid primary key,
  agent_id uuid not null references agent(agent_id),
  ucxl_addr text not null,
  started_at timestamptz not null,
  ended_at timestamptz,
  status text not null check (status in ('running','succeeded','failed','canceled')),
  correlation_id text,           -- idempotency key from caller
  meta jsonb not null default '{}'
);

-- raw usage (partition by day on observed_at)
create table usage_sample (
  sample_id uuid primary key,
  job_id uuid not null references job(job_id),
  observed_at timestamptz not null,
  cpu_seconds numeric(18,6) not null default 0,
  gpu_seconds numeric(18,6) not null default 0,
  ram_mb_peak numeric(18,3) not null default 0,
  disk_io_mb numeric(18,3) not null default 0,
  net_in_mb numeric(18,3) not null default 0,
  net_out_mb numeric(18,3) not null default 0,
  context_reads integer not null default 0,
  context_writes integer not null default 0,
  context_bytes_read bigint not null default 0,
  context_bytes_written bigint not null default 0,
  model_name text,               -- if any LLM was used (local/cloud)
  model_tokens_in bigint default 0,
  model_tokens_out bigint default 0,
  flags jsonb not null default '{}'
);

-- rollups (hourly & daily)
create table usage_rollup_hourly (
  deployment_id uuid not null references deployment(deployment_id),
  bucket_start timestamptz not null,  -- aligned to hour
  agent_type text not null,
  cpu_seconds numeric(18,6) not null,
  gpu_seconds numeric(18,6) not null,
  ram_mb_peak numeric(18,3) not null,
  net_in_mb numeric(18,3) not null,
  net_out_mb numeric(18,3) not null,
  context_reads bigint not null,
  context_writes bigint not null,
  context_bytes_read bigint not null,
  context_bytes_written bigint not null,
  model_tokens_in bigint not null,
  model_tokens_out bigint not null,
  jobs_succeeded bigint not null,
  jobs_failed bigint not null,
  primary key (deployment_id, bucket_start, agent_type)
);

create table usage_rollup_daily (
  deployment_id uuid not null references deployment(deployment_id),
  day date not null,
  cpu_seconds numeric(18,6) not null,
  gpu_seconds numeric(18,6) not null,
  context_bytes_written bigint not null,
  seats_active integer not null default 0,
  nodes_active integer not null default 0,
  orchestration_peak_concurrency integer not null default 0,
  model_tokens_in bigint not null,
  model_tokens_out bigint not null,
  primary key (deployment_id, day)
);

-- licensing / pricing
create table pricing_plan (
  plan_id text primary key,   -- e.g., 'SMB_Pro', 'Mid_Business'
  meta jsonb not null         -- published plan limits/features
);

create table license (
  license_id uuid primary key,
  org_id uuid not null references org(org_id),
  plan_id text not null references pricing_plan(plan_id),
  seats_limit integer,
  nodes_limit integer,
  features jsonb not null,            -- e.g., {"temporal_nav": true, "federation": false}
  valid_from date not null,
  valid_to date not null,
  signed_blob bytea not null          -- vendor-signed license
);

create table quota (
  quota_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  name text not null,                  -- e.g., 'context_bytes', 'temporal_queries'
  period text not null,                -- 'daily','monthly','rolling_30d'
  hard_limit bigint,                   -- null => unlimited in plan
  soft_threshold bigint,               -- trigger suggestion at e.g. 80%
  created_at timestamptz not null default now()
);

create table budget (
  budget_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  scope text not null,                 -- 'ingest','reason','orchestration'
  period text not null,                -- 'daily','weekly','monthly'
  limit_units numeric(18,6) not null,  -- arbitrary unit (e.g., cpu_seconds)
  action text not null,                -- 'warn','throttle','block','fallback_model'
  created_at timestamptz not null default now()
);

-- alerts & suggestions
create table alert (
  alert_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  created_at timestamptz not null default now(),
  severity text not null check (severity in ('info','warn','error')),
  code text not null,                  -- e.g., 'QUOTA_NEARING'
  message text not null,
  context jsonb not null default '{}',
  acknowledged boolean not null default false
);

create table suggestion (
  suggestion_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  created_at timestamptz not null default now(),
  kind text not null,                  -- 'upgrade_tier','enable_feature','tune_pipeline'
  rationale text not null,
  target_plan text,                    -- suggested plan id
  diffs jsonb not null default '{}',   -- what they gain quantitatively
  shown_to_user boolean not null default false,
  accepted boolean
);

-- auth for ingestion
create table ingest_token (
  token_id uuid primary key,
  deployment_id uuid not null references deployment(deployment_id),
  token_hash bytea not null,           -- store only hash
  scopes text[] not null,              -- ['ingest:usage','ingest:jobs']
  created_at timestamptz not null default now(),
  expires_at timestamptz
);

-- convenience: daily seat/node activity
create materialized view mv_daily_activity as
select
  d.deployment_id,
  date_trunc('day', j.started_at) as day,
  count(distinct a.agent_id) filter (where j.status='succeeded') as agents_active,
  count(distinct a.node_id) as nodes_active
from job j
join agent a on a.agent_id = j.agent_id
join deployment d on d.deployment_id = a.deployment_id
group by 1,2;

Indexes youll want: (job.agent_id), (usage_sample.job_id), (usage_sample.observed_at), (usage_rollup_hourly.bucket_start), (alert.deployment_id, created_at desc), (quota.deployment_id, name).


2) Telemetry event flow

On each agent run (end-of-job or periodic heartbeat):

  1. Agent emits a signed JSON payload to the local KACHING Ingest API.

  2. KACHING validates token + signature, dedups via correlation_id.

  3. Persist raw job + usage_sample.

  4. Stream to Aggregator worker (local queue: Redis Streams/NATS JetStream).

  5. Aggregator updates hourly/daily rollups and checks quota/budget.

  6. If thresholds breached → create alert + suggestion.

  7. (Optional) Post redacted rollups to HQ (batch, e.g., every 6h) over mTLS.

Event schema (agent → ingest)

{
  "schema": "kaching.v1.usage",
  "deployment_id": "6d0b1bcb-...-9c9d",
  "agent": {
    "agent_id": "0f32...ab",
    "agent_type": "SLURP_ingest",
    "version": "1.7.3"
  },
  "node": {
    "node_id": "4aa0...de",
    "hostname": "raven01",
    "hw_class": "Ryzen 5600 + RTX 3060 12G"
  },
  "job": {
    "job_id": "a3e8...90",
    "correlation_id": "ucxl:alpha:2025-08-14T08:03:05Z:42",
    "ucxl_addr": "ucxl://any:finance@project:alpha/*/report.md",
    "started_at": "2025-08-14T08:03:00Z",
    "ended_at": "2025-08-14T08:03:05Z",
    "status": "succeeded",
    "meta": { "workflow": "SLURP", "source": "Confluence" }
  },
  "metrics": {
    "cpu_seconds": 48.5,
    "gpu_seconds": 3.1,
    "ram_mb_peak": 520,
    "disk_io_mb": 18.2,
    "net_in_mb": 0.5,
    "net_out_mb": 1.2,
    "context_reads": 114,
    "context_writes": 7,
    "context_bytes_read": 812345,
    "context_bytes_written": 1280033
  },
  "llm": {
    "model_name": "llama3.1:8b-q4",
    "tokens_in": 1820,
    "tokens_out": 740,
    "provider": "local"
  },
  "sig": {
    "algo": "ed25519",
    "key_id": "agentkey-02",
    "signature": "<base64>"
  }
}

Idempotency rules

  • job.correlation_id required; duplicates are UPSERTed (last one wins if timestamps advance).

  • Clock skew tolerated: if ended_at < started_at by small delta, accept and flag.

Security

  • mTLS between agents and KACHING.

  • Per-deployment ingest_token with narrow scopes.

  • Payload signature checked against registered key_id (rotate quarterly).

  • PII-free by default; redact ucxl_addr segments via local policy if needed.


3) Aggregation & thresholds

Rollup jobs

  • Hourly: group by deployment_id, agent_type, bucket_start.

  • Daily: group by deployment_id, day.

  • Maintain peak orchestration concurrency using job overlap counts (interval tree or btree_gist on (started_at, ended_at)).

Threshold engine

  • Load effective limits from license.features + quota.

  • Example checks:

    • Soft: context_bytes_written (30d) ≥ 80% of limit → QUOTA_NEARING.

    • Hard: nodes_active > license.nodes_limitHARD_BLOCK (if policy says).

    • Budget: gpu_seconds (daily) > budget → emit BUDGET_EXCEEDED with policy action (fallback_model etc).

Suggestion generator

  • Map overages to plan ladder deltas (from your published pricing_plan.meta):

    • “Upgrade Pro → Business: +3× context, +multi-site federation”
  • Compute quantitative deltas (e.g., “your 30-day context is 1.8× current cap; Business raises cap to 3.0×”).


4) Example queries that drive the UI/upsell

A) Soft quota nearing (30-day rolling)

with windowed as (
  select deployment_id,
         sum(context_bytes_written) over (
           partition by deployment_id
           order by day
           rows between 29 preceding and current row
         ) as bytes_30d,
         day
  from usage_rollup_daily
)
select w.deployment_id, w.day, w.bytes_30d, q.soft_threshold, q.hard_limit
from windowed w
join quota q on q.deployment_id = w.deployment_id and q.name='context_bytes' and q.period='rolling_30d'
where w.bytes_30d >= q.soft_threshold;

B) Peak concurrency

-- approximate: count overlapping running jobs at minute resolution
select deployment_id, bucket_minute, max(concurrency) as peak
from (
  select a.deployment_id,
         date_trunc('minute', g.bucket) as bucket_minute,
         count(*) as concurrency
  from job j
  join agent a on a.agent_id = j.agent_id
  join generate_series(j.started_at, j.ended_at, interval '1 minute') as g(bucket) on true
  where j.status in ('running','succeeded')
  group by 1,2
) t
group by 1,2
order by peak desc
limit 1;

C) “Who to upsell this week?”

select d.deployment_id,
       sum(case when name='context_bytes' then 1 else 0 end) as context_near,
       sum(case when name='temporal_queries' then 1 else 0 end) as temporal_near
from alert a
join deployment d on d.deployment_id=a.deployment_id
where a.code='QUOTA_NEARING' and a.created_at > now() - interval '7 days'
group by 1
order by (context_near + temporal_near) desc
limit 25;

5) API surfaces (local only by default)

Ingest

  • POST /v1/ingest/usage : accepts kaching.v1.usage (above)

  • POST /v1/ingest/job-status : minimal heartbeat/status updates

Dashboards

  • GET /v1/usage/daily?deployment_id=...&from=...&to=...

  • GET /v1/limits/effective?deployment_id=... (license + quotas merged)

  • GET /v1/alerts?deployment_id=...

  • GET /v1/suggestions?deployment_id=...

Admin

  • POST /v1/quota (create/update)

  • POST /v1/budget

  • POST /v1/license/activate (uploads vendor-signed blob)

  • POST /v1/tokens (issue/rotate ingest tokens)

Auth: local mTLS + Authorization: Bearer <token>; all responses cacheable 60s.


6) Deployment architecture (on-prem first)

  • KACHING Core (Go/Rust service)

    • HTTP ingest + API

    • Aggregator worker

    • Scheduler (rollups, alerts)

    • Optional “HQ Sync” module (disabled by default)

  • State

    • PostgreSQL 15+ (enable pg_partman / native range partitioning)

    • Redis/NATS for event buffer (optional; fall back to COPY-on-commit if absent)

  • Packaging

    • Systemd unit or Docker Compose

    • Helm chart for k8s clusters (nodeSelector for DB locality)

  • Resilience

    • Backpressure: agents buffer to disk (bounded queue) if ingest is unavailable

    • Idempotent writes via correlation_id

    • Daily VACUUM/ANALYZE; weekly REFRESH MATERIALIZED VIEW mv_daily_activity


7) Retention & performance

  • Raw usage_sample: 1430 days hot; downsample into rollups; archive to parquet on local object storage (MinIO) monthly.

  • Rollups: keep 24 months.

  • Partition keys: usage_sample.observed_at (daily), usage_rollup_hourly.bucket_start (daily).

  • Use BRIN indexes for time partitions; btree for FK lookups.


8) License enforcement (light-touch)

  • Signed license (license.signed_blob) includes: plan id, seat/node caps, expiry, feature bitset, signature.

  • Enforce soft by default (warn + suggest), hard only when explicitly configured (enterprise asks).

  • Local-only check; never phones home unless HQ Sync is enabled.

Prove governance, not just spend. KACHING rolls up per-job context ops, model tokens, and concurrency into org-level signals, compares against quotas/budgets, and emits policy actions (warn/throttle/block/fallback). Thiss how we enforce runtime guardrails in a way boards and auditors can verify.


9) Budget actions (runtime knobs CHORUS can use)

When KACHING emits BUDGET_EXCEEDED with action:

  • warn: post in CHORUS banner.

  • throttle: cap orchestration concurrency N.

  • block: reject new jobs in that scope.

  • fallback_model: instruct agent to switch to cheaper local model; pass a policy_decision payload back via CHORUS control channel.


10) UX hooks (what users see)

  • Tier Utilization card: seats, nodes, temporal queries, context volume; sparkline + % of cap.

  • Bottleneck callouts: “Temporal queries blocked 3× this week.”

  • Clear upgrade CTA: shows concrete deltas (“+3× context window, +multi-site federation”).


11) Minimal agent SDK shim

Config (YAML)

kaching:
  ingest_url: https://kaching.local/v1/ingest/usage
  token: ${KACHING_TOKEN}
  key_id: agentkey-02
  key_path: /etc/chorus/agent_ed25519
  flush_interval_ms: 2000
  max_batch: 200
  dedupe_key: correlation_id
  redact:
    ucxl_addr_segments: ["credentials", "secrets"]

Agent integration (pseudo-code)

with Job("ucxl://.../report.md") as job:
    # ... do work ...
    kaching.emit_usage(job_id=job.id,
                       cpu_seconds=cpu,
                       gpu_seconds=gpu,
                       context_reads=reads,
                       context_writes=writes,
                       model_name=model,
                       tokens_in=tin, tokens_out=tout)

12) What this unlocks for pricing

  • Flat annual licenses remain clean because you can prove usage & growth.

  • Automated, evidence-based upsells (“youre consistently at 8895% of context cap; Business tier gives +3× headroom and federation you already need”).

  • If you ever add paid add-ons (e.g., optional hosted LLM fallback), the hooks (model tokens, provider) are already in the schema.


13) Implementation order (2-week sprint)

  1. DDL + partitioning + ingest auth (ed25519, token hash).

  2. Ingest API + idempotent writes + agent shim.

  3. Hourly/daily rollups + three alerts (QUOTA_NEARING, BUDGET_EXCEEDED, NODES_LIMIT_EXCEEDED).

  4. Suggestion generator v1 (map to your three ladders).

  5. Minimal web UI cards (tier utilization, alerts, CTA).

  6. Optional: HQ Sync (batched, redacted).


KACHING on Google Cloud a tight, secure, hybrid telemetry platform that your on-prem CHORUS installs can push into. Below is a practical blueprint you can build from today: services, IAM, network, data path, OpenAPI, and a Go reference skeleton.

KACHING on GCP: production architecture

Core services (minimal, proven stack)

  • Cloud Run (fully managed)

    • kaching-ingest (public endpoint, locked by mutual auth + signed payloads)

    • kaching-api (dashboards/admin; private behind IAP)

    • kaching-aggregator (background rollups; also run as Cloud Run jobs)

  • Pub/Sub

    • usage-events (raw events)

    • dead-letter-usage (DLQ with retry policy)

  • Cloud SQL for PostgreSQL 15+ (primary persistence; point-in-time recovery enabled)

  • Cloud Storage

    • kaching-archive (monthly Parquet archives; optional customer exports)
  • BigQuery

    • kaching_analytics dataset (hourly/daily rollups mirrored for analytics & Looker)
  • Secret Manager (ingest tokens, per-org agent public keys, DB creds)

  • Cloud KMS (CMEK for Cloud SQL, GCS, Pub/Sub; license signing keys)

  • Cloud Scheduler + Cloud Run jobs (hourly/daily rollup + archive tasks)

  • Cloud Logging + Cloud Monitoring (SLIs/SLOs; alerting policies)

  • Cloud Armor (WAF in front of HTTPS LB to Cloud Run)

  • Identity-Aware Proxy (IAP) (protect kaching-api admin UI)

  • VPC + Serverless VPC Access (Cloud Run ↔ Cloud SQL private IP)

Optional (later): Apigee if you need enterprise API governance; Dataflow if rollups outgrow SQL.


Data flow (event path)

Telemetry is signed + idempotent (ed25519, correlation IDs) and multi-tenant isolated (RLS/KMS/CMEK); redacted rollups can sync to HQ for cross-deployment analytics.

  1. On-prem agent completes a job → emits signed kaching.v1.usage JSON over HTTPS to kaching-ingest.

    • Auth: Authorization: Bearer <ingest_token> (hash validated)

    • Integrity: detached Ed25519 signature over canonical JSON (agent key registered per org)

    • Idempotency: job.correlation_id

  2. kaching-ingest performs fast stateless checks → pushes message to Pub/Sub usage-events (attributes include org_id, deployment_id, schema_version, event_ts).

  3. kaching-aggregator (Pub/Sub push/pull)

    • Writes raw to usage_sample/job in Cloud SQL (in txn)

    • Maintains hourly/daily rollups (SQL upserts)

    • Evaluates quota/budget → inserts alert/suggestion

    • Mirrors fresh rollups to BigQuery (streaming inserts or 5-min batch)

  4. Cloud Run job (hourly, daily)

    • VACUUM/ANALYZE hot partitions

    • Export prior month raw to GCS Parquet

    • Advance materialized views

  5. kaching-api serves dashboards (IAP-protected) + admin endpoints.


Multi-tenancy & data isolation

  • All rows keyed by org_id + deployment_id; enforce RLS (Row Level Security) in PostgreSQL for any shared read paths.

  • Each org has a KMS-wrapped secret namespace (ingest tokens, agent public keys).

  • CMEK on Cloud SQL & GCS; per-org key rings if you need tenant keying.


Security model (pragmatic & strong)

  • Ingress: HTTPS LB → Cloud Armor → Cloud Run (kaching-ingest).

  • mTLS (optional): If you control agent certs, terminate mTLS at the LB; otherwise rely on:

    • Bearer ingest token (DB-stored bcrypt/argon2 hash)

    • Ed25519 signature (payload integrity, replay window ≤ 5 min)

    • Idempotency via (org_id, correlation_id) unique index

  • Secrets: only via Secret Manager; short-lived Cloud SQL IAM tokens (no static passwords in code).

  • Least-privilege IAM service accounts (see below).


IAM layout (service accounts & roles)

  • sa-kaching-ingest

    • roles/pubsub.publisher (to usage-events)

    • roles/secretmanager.secretAccessor (read token pepper + org pubkeys)

  • sa-kaching-aggregator

    • roles/cloudsql.client, roles/secretmanager.secretAccessor

    • roles/pubsub.subscriber (from usage-events)

    • roles/storage.objectAdmin (to write Parquet archives)

    • roles/bigquery.dataEditor (analytics dataset)

  • sa-kaching-api

    • roles/cloudsql.client, roles/secretmanager.secretAccessor

    • Protected by IAP; org admins authenticated via Google Identity

  • sa-kaching-scheduler

    • Invokes Cloud Run jobs; minimal runner roles
  • KMS: grant each SA cryptoKeyEncrypterDecrypter on CMEK keys.


Database schema

Use the schema we defined earlier (orgs, deployments, nodes, agents, jobs, usage_sample, rollups, license, quota, budget, alert, suggestion, tokens, etc.) unchanged—it fits Cloud SQL.
Add:

-- RLS example (read-only API role sees only its org)
alter table usage_rollup_daily enable row level security;
create policy org_isolation on usage_rollup_daily
  using (org_id = current_setting('app.current_org')::uuid);

Your API sets set app.current_org = '<org-uuid>'; after auth.


OpenAPI (ingest + suggestions)

openapi: 3.0.3
info:
  title: KACHING API
  version: 1.0.0
servers:
  - url: https://ingest.kaching.yourdomain.com
paths:
  /v1/ingest/usage:
    post:
      security: [{ IngestToken: [] }]
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/KachingUsageV1'
      responses:
        '202': { description: accepted }
        '400': { description: bad request }
        '401': { description: unauthorized }
        '409': { description: duplicate (idempotent) }
  /v1/suggestions:
    get:
      security: [{ IAPAuth: [] }]
      parameters:
        - in: query; name: deployment_id; required: true; schema: { type: string, format: uuid }
      responses:
        '200':
          description: list
          content:
            application/json:
              schema:
                type: array
                items: { $ref: '#/components/schemas/Suggestion' }
components:
  securitySchemes:
    IngestToken:
      type: http
      scheme: bearer
    IAPAuth:
      type: http
      scheme: bearer
  schemas:
    KachingUsageV1:
      type: object
      required: [schema, deployment_id, agent, node, job, metrics, sig]
      properties:
        schema: { type: string, enum: ["kaching.v1.usage"] }
        deployment_id: { type: string, format: uuid }
        agent: { type: object, required: [agent_id, agent_type] }
        node: { type: object, required: [node_id, hostname] }
        job: { type: object, required: [job_id, ucxl_addr, started_at, status] }
        metrics: { type: object }
        llm: { type: object }
        sig:
          type: object
          required: [algo, key_id, signature]
          properties:
            algo: { type: string, enum: ["ed25519"] }
            key_id: { type: string }
            signature: { type: string } # base64
    Suggestion:
      type: object
      properties:
        suggestion_id: { type: string, format: uuid }
        kind: { type: string }
        rationale: { type: string }
        target_plan: { type: string }
        diffs: { type: object }

Go reference: Cloud Run ingest handler (concise skeleton)

package main

import (
  "context"
  "crypto/ed25519"
  "crypto/subtle"
  "encoding/base64"
  "encoding/json"
  "log"
  "net/http"
  "os"
  "time"

  "cloud.google.com/go/pubsub"
)

type Usage struct {
  Schema       string `json:"schema"`
  DeploymentID string `json:"deployment_id"`
  Agent        struct {
    AgentID   string `json:"agent_id"`
    AgentType string `json:"agent_type"`
    Version   string `json:"version"`
  } `json:"agent"`
  Node struct {
    NodeID   string `json:"node_id"`
    Hostname string `json:"hostname"`
    HwClass  string `json:"hw_class"`
  } `json:"node"`
  Job struct {
    JobID         string    `json:"job_id"`
    CorrelationID string    `json:"correlation_id"`
    UCXLAddr      string    `json:"ucxl_addr"`
    StartedAt     time.Time `json:"started_at"`
    EndedAt       *time.Time `json:"ended_at"`
    Status        string    `json:"status"`
    Meta          any       `json:"meta"`
  } `json:"job"`
  Metrics any `json:"metrics"`
  LLM     any `json:"llm"`
  Sig struct {
    Algo      string `json:"algo"`
    KeyID     string `json:"key_id"`
    Signature string `json:"signature"`
  } `json:"sig"`
}

var (
  topic *pubsub.Topic
)

func main() {
  ctx := context.Background()
  projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
  topicName := os.Getenv("PUBSUB_TOPIC") // usage-events
  client, err := pubsub.NewClient(ctx, projectID)
  if err != nil { log.Fatal(err) }
  topic = client.Topic(topicName)
  http.HandleFunc("/v1/ingest/usage", handleIngest)
  log.Fatal(http.ListenAndServe(":8080", nil))
}

func handleIngest(w http.ResponseWriter, r *http.Request) {
  // 1) Auth: bearer token
  tok := r.Header.Get("Authorization")
  if !validateIngestToken(tok) {
    http.Error(w, "unauthorized", http.StatusUnauthorized); return
  }
  // 2) Parse
  var u Usage
  dec := json.NewDecoder(r.Body)
  dec.DisallowUnknownFields()
  if err := dec.Decode(&u); err != nil {
    http.Error(w, "bad json", http.StatusBadRequest); return
  }
  if u.Schema != "kaching.v1.usage" {
    http.Error(w, "bad schema", http.StatusBadRequest); return
  }
  // 3) Verify signature over canonical JSON (client must sign without "sig")
  //    For brevity assume client also sends "X-Payload-Hash" header we check here.
  if !verifySignature(r.Context(), u) {
    http.Error(w, "bad signature", http.StatusUnauthorized); return
  }
  // 4) Publish to Pub/Sub with ordering key = deployment_id
  b, _ := json.Marshal(u)
  res := topic.Publish(r.Context(), &pubsub.Message{
    Data:        b,
    OrderingKey: u.DeploymentID,
    Attributes: map[string]string{
      "deployment_id": u.DeploymentID,
      "agent_type":    u.Agent.AgentType,
      "schema":        u.Schema,
    },
  })
  if _, err := res.Get(r.Context()); err != nil {
    http.Error(w, "queue error", http.StatusServiceUnavailable); return
  }
  w.WriteHeader(http.StatusAccepted)
}

func validateIngestToken(hdr string) bool {
  // Expect "Bearer abc"
  // Lookup hashed value in Secret Manager or Redis; constant-time compare
  want := os.Getenv("INGEST_TOKEN_HASH") // e.g., sha256 hex
  got := extractBearer(hdr)
  sum := sha256Hex(got)
  return subtle.ConstantTimeCompare([]byte(sum), []byte(want)) == 1
}

func verifySignature(ctx context.Context, u Usage) bool {
  // Lookup org/deployment → key_id → ed25519 public key (Secret Manager)
  pub := fetchEd25519Pub(ctx, u.Sig.KeyID)
  sigBytes, _ := base64.StdEncoding.DecodeString(u.Sig.Signature)
  // canonicalize payload without Sig; omitted here for brevity
  payload := canonicalJSON(u, /*excludeSig=*/true)
  return ed25519.Verify(pub, payload, sigBytes)
}

Notes
• Enable ordered delivery on Pub/Sub if you need strict per-deployment ordering.
• For signature canonicalization, fix field order and whitespace (e.g., RFC 8785 JSON Canonicalization Scheme).


Rollups & thresholds (Cloud Run job)

  • Hourly job:

    • UPSERT into usage_rollup_hourly grouped by (deployment_id, bucket_start, agent_type)

    • Evaluate soft/hard quotas; write alert/suggestion

    • Stream hourly snapshot into BigQuery (for Looker dashboards)

  • Daily job:

    • UPSERT usage_rollup_daily

    • Export aging usage_sample partitions → GCS Parquet

    • VACUUM/ANALYZE current partitions

    • Refresh materialized views

Run both with Cloud SchedulerCloud Run jobs (retries on failure).


Networking & regions

  • Put everything in one primary region close to your customers (e.g., australia-southeast1 for you).

  • Turn on PITR for Cloud SQL; add read replica in nearby region if needed.

  • If you require egress locality for data sovereignty, pin GCS and BigQuery datasets to the same region and enforce org-scoped KMS keys.


Observability & SLOs

  • SLI ideas:

    • Ingest success rate ≥ 99.9% (5-min windows)

    • Pub/Sub → DB end-to-end latency p95 < 2 min

    • Aggregator error rate < 0.1%

  • Dashboards (Cloud Monitoring):

    • Requests by status, Pub/Sub undelivered messages, Cloud Run CPU/mem, Cloud SQL CPU/IO wait
  • Alerts:

    • Pub/Sub backlog > threshold 10 min

    • Cloud Run 5xx > 1% for 5 min

    • SQL connection errors burst


Terraform pointers (resource sketch)

resource "google_pubsub_topic" "usage" {
  name                        = "usage-events"
  message_retention_duration  = "604800s"
  kms_key_name                = google_kms_crypto_key.pubsub.id
}

resource "google_sql_database_instance" "pg" {
  name             = "kaching-pg"
  database_version = "POSTGRES_15"
  region           = "australia-southeast1"
  settings {
    tier = "db-custom-2-7680" # pick to taste
    backup_configuration { point_in_time_recovery_enabled = true }
    ip_configuration { private_network = google_compute_network.vpc.id }
    encryption_key_name = google_kms_crypto_key.sql.id
  }
}

resource "google_cloud_run_service" "ingest" {
  name     = "kaching-ingest"
  location = "australia-southeast1"
  template {
    spec {
      containers {
        image = "gcr.io/PROJECT/kaching-ingest:latest"
        env { name = "PUBSUB_TOPIC" value = google_pubsub_topic.usage.name }
      }
      service_account_name = google_service_account.ingest.email
    }
  }
}

License issuance on GCP

  • Keep a vendor signing key in Cloud KMS; sign a compact license JWT or binary blob containing: org_id, plan_id, valid_from/to, feature bitset, seat/node caps.

  • kaching-api validates licenses locally with the public key; no call-home needed.

  • Store customer licenses in Cloud SQL; deliver file via your portal.


Migration path from pure on-prem KACHING

  • Start with this hosted KACHING; if a customer requires air-gapped, deploy the same services via GKE-on-prem or a single-VM bundle (Postgres + NATS + the same services).

  • Keep the event schema 1:1 so both modes are supported.


What to build first (so you can ship)

  1. Ingest (Cloud Run) + Pub/Sub + aggregator writing to Cloud SQL (raw + hourly).

  2. Threshold engine (soft quota → alert + suggestion) mapped to your three price ladders.

  3. API read endpoints + IAP-protected minimal UI tiles: tier utilisation, recent alerts, suggested upgrade.

  4. Daily job to Parquet export + Looker Studio over BigQuery for internal analytics.

TODO

  • Ingestion gateways: Implement consistent event schema and authenticated ingestion across CHORUS/WHOOSH/RUSTLE/SLURP; add collectors per agent type.
  • Policy enforcement: Apply license/plan/quotas in orchestration paths (WHOOSH) with alerts and upgrade nudges.
  • Rollups to HQ: Optional redacted rollups push with privacy controls; add unit/integration tests.
  • Dashboard completeness: Wire cross-org/deployment/node views to live telemetry with filtering and drill-down.