Optimistic Concurrency Control
The "wrong version" check prevents two concurrent writes from conflicting:
async fn append_event(
pool: &PgPool,
stream_id: &str,
expected_version: i64,
event: &Event,
) -> Result<(), ConcurrencyError> {
let result = sqlx::query!(
r#"
INSERT INTO events (stream_id, stream_seq, event_type, data)
VALUES ($1,
(SELECT COALESCE(MAX(stream_seq), -1) + 1
FROM events WHERE stream_id = $1),
$2, $3)
WHERE (SELECT COALESCE(MAX(stream_seq), -1) FROM events WHERE stream_id = $1) = $4
"#,
stream_id, event.event_type, event.data, expected_version
)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
Err(ConcurrencyError::WrongVersion)
} else {
Ok(())
}
}