Implement chrs-mail: SQLite-based P2P messaging layer (Overstory-inspired)

This commit is contained in:
anthonyrawlins
2026-03-03 17:05:37 +11:00
parent 37d46c4bc3
commit b3c37825b2
2 changed files with 200 additions and 0 deletions

12
chrs-mail/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "chrs-mail"
version = "0.1.0"
edition = "2024"
[dependencies]
rusqlite = { version = "0.30", features = ["bundled"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.4", features = ["v4", "serde"] }

188
chrs-mail/src/lib.rs Normal file
View File

@@ -0,0 +1,188 @@
// chrs-mail library implementation
use std::path::Path;
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use uuid::Uuid;
/// Represents a mail message stored in the mailbox.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Message {
pub id: Uuid,
pub from_peer: String,
pub to_peer: String,
pub topic: String,
pub payload: JsonValue,
pub sent_at: DateTime<Utc>,
pub read_at: Option<DateTime<Utc>>,
}
/// Errors that can occur while using the Mailbox.
#[derive(Debug, Error)]
pub enum MailError {
#[error("SQLite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("JSON serialization error: {0}")]
Json(#[from] serde_json::Error),
#[error("UUID parsing error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Chrono parsing error: {0}")]
ChronoParse(#[from] chrono::ParseError),
}
/// Wrapper around a SQLite connection providing mail-box functionalities.
pub struct Mailbox {
conn: Connection,
}
impl Mailbox {
/// Open (or create) a mailbox database at `path`.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, MailError> {
let conn = Connection::open(path)?;
// Enable WAL mode.
conn.pragma_update(None, "journal_mode", &"WAL")?;
// Create table.
conn.execute(
"CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
from_peer TEXT NOT NULL,
to_peer TEXT NOT NULL,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
sent_at TEXT NOT NULL,
read_at TEXT
)",
[],
)?;
Ok(Self { conn })
}
/// Store a new message in the mailbox.
pub fn send(&self, msg: &Message) -> Result<(), MailError> {
let payload_str = serde_json::to_string(&msg.payload)?;
self.conn.execute(
"INSERT INTO messages (id, from_peer, to_peer, topic, payload, sent_at, read_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)",
params![
msg.id.to_string(),
&msg.from_peer,
&msg.to_peer,
&msg.topic,
payload_str,
msg.sent_at.to_rfc3339(),
],
)?;
Ok(())
}
/// Retrieve all unread messages addressed to `peer_id`.
pub fn receive_pending(&self, peer_id: &str) -> Result<Vec<Message>, MailError> {
let mut stmt = self.conn.prepare(
"SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at
FROM messages
WHERE to_peer = ?1 AND read_at IS NULL",
)?;
let rows = stmt.query_map(params![peer_id], |row| {
let id_str: String = row.get(0)?;
let from_peer: String = row.get(1)?;
let to_peer: String = row.get(2)?;
let topic: String = row.get(3)?;
let payload_str: String = row.get(4)?;
let sent_at_str: String = row.get(5)?;
let read_at_opt: Option<String> = row.get(6)?;
// Parse Uuid
let id = Uuid::parse_str(&id_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?;
// Parse JSON
let payload: JsonValue = serde_json::from_str(&payload_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
// Parse Timestamps
let sent_at = DateTime::parse_from_rfc3339(&sent_at_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?
.with_timezone(&Utc);
let read_at = match read_at_opt {
Some(s) => Some(
DateTime::parse_from_rfc3339(&s)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))?
.with_timezone(&Utc),
),
None => None,
};
Ok(Message {
id,
from_peer,
to_peer,
topic,
payload,
sent_at,
read_at,
})
})?;
let mut msgs = Vec::new();
for msg_res in rows {
msgs.push(msg_res?);
}
Ok(msgs)
}
/// Mark a message as read by setting its `read_at` timestamp.
pub fn mark_read(&self, msg_id: Uuid) -> Result<(), MailError> {
let now = Utc::now().to_rfc3339();
self.conn.execute(
"UPDATE messages SET read_at = ?1 WHERE id = ?2",
params![now, msg_id.to_string()],
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::fs;
fn temp_db_path() -> std::path::PathBuf {
let mut dir = env::temp_dir();
dir.push(format!("chrs_mail_test_{}.sqlite", Uuid::new_v4()));
dir
}
#[test]
fn roundtrip_send_and_receive() -> Result<(), MailError> {
let db_path = temp_db_path();
if db_path.exists() {
fs::remove_file(&db_path).unwrap();
}
let mailbox = Mailbox::open(&db_path)?;
let msg = Message {
id: Uuid::new_v4(),
from_peer: "alice".into(),
to_peer: "bob".into(),
topic: "greeting".into(),
payload: serde_json::json!({"text": "Hello"}),
sent_at: Utc::now(),
read_at: None,
};
mailbox.send(&msg)?;
let pending = mailbox.receive_pending("bob")?;
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, msg.id);
mailbox.mark_read(msg.id)?;
let pending2 = mailbox.receive_pending("bob")?;
assert!(pending2.is_empty());
fs::remove_file(db_path).unwrap();
Ok(())
}
}