今の所、 15840, 50Mb の json ファイルの各エントリを書いて、読んで、消すのを、 86.7ms でやれてるからいいかなって。5.5μs だとすると、

https://ryhl.io/blog/async-what-is-blocking/

の目安で言うと non-blocking と見なしてもいいくらい。

To give a sense of scale of how much time is too much, a good rule of thumb is no more than 10 to 100 microseconds between each .await. That said, this depends on the kind of application you are writing.

もちろん、データが大きかったりするから実際も async 対応しますけど。

やり始めたら超厳密にやったほうがいいんだろうけど、数字で比較してるだけいいでしょ…。

下記が現状の比較結果:

Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis
  Time (mean ± σ):     514.7 ms ±   2.2 ms    [User: 180.8 ms, System: 738.8 ms]
  Range (min … max):   511.7 ms … 526.5 ms    50 runs

Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb
  Time (mean ± σ):     103.8 ms ±   2.6 ms    [User: 192.6 ms, System: 569.2 ms]
  Range (min … max):    97.8 ms … 112.0 ms    50 runs

Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite
  Time (mean ± σ):     140.8 ms ±   0.9 ms    [User: 136.1 ms, System: 3.2 ms]
  Range (min … max):   139.2 ms … 143.0 ms    50 runs

Summary
  ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb ran
    1.36 ± 0.03 times faster than ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite
    4.96 ± 0.12 times faster than ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis

testing script

hyperfine --warmup 3 '~/code/sandbox/databases/db_test_redis/target/release/db_test_redis' '~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb' '~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite'

single thread

Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis
  Time (mean ± σ):      1.010 s ±  0.020 s    [User: 0.106 s, System: 0.275 s]
  Range (min … max):    0.970 s …  1.029 s    10 runs

Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb
  Time (mean ± σ):     181.5 ms ±  10.5 ms    [User: 117.8 ms, System: 72.2 ms]
  Range (min … max):   174.1 ms … 212.0 ms    16 runs

Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite
  Time (mean ± σ):     42.610 s ±  7.241 s    [User: 0.471 s, System: 6.406 s]
  Range (min … max):   33.006 s … 54.726 s    10 runs

Summary
  ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb ran
    5.57 ± 0.34 times faster than ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis
  234.73 ± 42.15 times faster than ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite

ただし、sqlite を Connection::open_in_memory() とすると、一番になるという事実で、こうすると当然だけど発揮性になる:

Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite
  Time (mean ± σ):     109.0 ms ±   1.3 ms    [User: 104.2 ms, System: 3.3 ms]
  Range (min … max):   106.8 ms … 112.9 ms    27 runs

トレードオフとして、 sqlitein memory 運用し出すと、

  • バックアップに対するポリシー
  • 再起動時の再読み込み

を考えないと行けなくなり、そんなんなら普通に HashMap で保存したっていい気がする。 redis を使うのは、メモリだとしても、バックアップ等、非発揮でデータを保存できるからである。

もう一つ、運営上便利なのはアドミン画面で、

db redis rocksdb sqlite sqlite (in memory)
speed 3rd 2nd 4th 1st
volatile false false false true
admin true false true false

こうすると、なんだかんだで、 redis が最高速じゃないけど、そこそこ何も考えなければいいってことになる。

sqlite でもいいと思ったんだけどなぁ…。

multithread (horizontal scaling)

redis

redis がかわいそうなので、マルチスレッドにして、connection を複数にしてみた。

Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis
  Time (mean ± σ):     390.2 ms ±   5.4 ms    [User: 101.0 ms, System: 480.3 ms]
  Range (min … max):   384.9 ms … 404.0 ms    10 runs

そしたら、大体 3 倍弱のスピード増。

マルチスレッドにしたから、system time が wall clock time (実時間) よりも大きくなってる(確認)

それでもまだ sqliterocksdb には及ばない。一エントリ当たり、24 マイクロ秒なので、これを async 環境で使うなら、 block_on を使わないと行けない。

rocksdb

同じように、rocksdb もマルチスレッドにした。上の例と同じように、一つの database に対して、一つの接続をするんだけど、column family と言うアトミック write ができるようにした。

こっちはもっとデータの格納レベルで分けてる。sharding。ドキュメント的には、column ごとに、atomic write だから、db 分ける必要ないんだけど… まあいいや。

Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb
  Time (mean ± σ):      86.7 ms ±   3.0 ms    [User: 155.0 ms, System: 446.3 ms]
  Range (min … max):    82.1 ms …  94.8 ms    50 runs

大体スピードは約 2 倍 sqlite 抜いた。

db redis rocksdb sqlite sqlite (in memory)
speed 3rd 1st 4th 2nd
volatile false false false true
admin true false true false

結構 sharding しちゃってるから、メンテで頭が痛いけど(特に、admin 画面)、rocksdb が他のオプションと比べても、合理的かもね。

Databases

redis

single thread

use redis::Commands;
use serde::{Serialize, Deserialize};
use std::{fs::File, io::Read};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct People {
    name: String,
    language: String,
    bio: String,
    version: f64
}

const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json";

fn main() {
    let mut file = File::open(JSON_PATH).unwrap();
    let mut buf: String = String::new();
    // read json data;
    file.read_to_string(&mut buf).unwrap();

    let values: Vec<People> = serde_json::from_str(&buf).unwrap();

    // create a db connection
    let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();
    let mut con = client.get_connection().unwrap();

    for i in 0..values.len() {
        let key = format!("p{i}");
        let bytes: Vec<u8> =  serde_json::to_vec(&values[i]).unwrap();
        let _: () = con.set(key, &bytes).unwrap();
    }

    for i in 0..values.len() {
        let key = format!("p{i}");
        let bytes: Vec<u8> = con.get(&key).unwrap();
        let v: People = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(&v, &values[i]);
    }

    for i in 0..values.len() {
        let key = format!("p{i}");
        let _: () = con.del(key).unwrap();
    }
}

multi thread

途端に難しくなるね。

Mutex を使ってもいいが、 channel を使うことにした。

use crossbeam::channel;
use oneshot;
use redis::Commands;
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read};

const NUM_THREADS: usize = 16;

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct People {
    name: String,
    language: String,
    bio: String,
    version: f64,
}

enum MessageType {
    READ,
    WRITE,
    DEL,
}

enum Request {
    People((usize, People)),
    Id(usize),
}

enum Response {
    People(People),
    Ok,
}

struct Message {
    pub kind: MessageType,
    pub request: Request,
    pub response: oneshot::Sender<Response>,
}

impl Message {
    fn new(request: Request, kind: MessageType) -> (Self, oneshot::Receiver<Response>) {
        let (response, r) = oneshot::channel();
        let mes = Self {
            kind,
            response,
            request,
        };
        (mes, r)
    }

    fn new_read(id: usize) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::Id(id), MessageType::READ)
    }

    fn new_write(id: usize, people: People) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::People((id, people)), MessageType::WRITE)
    }

    fn new_del(id: usize) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::Id(id), MessageType::DEL)
    }
}

const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json";

fn create_connection_thread(client: &redis::Client) -> channel::Sender<Message> {
    // assuming redis clients can have multiple connection in one client.
    let mut con = client.get_connection().unwrap();
    let (s, r): (channel::Sender<Message>, channel::Receiver<Message>) = channel::unbounded();

    std::thread::spawn(move || {
        while let Ok(mes) = r.recv() {
            match mes.kind {
                MessageType::READ => {
                    let Request::Id(i) = mes.request else {
                        continue;
                    };

                    let key = format!("p{i}");
                    let bytes: Vec<u8> = con.get(&key).unwrap();
                    let p: People = serde_json::from_slice(&bytes).unwrap();
                    let _ = mes.response.send(Response::People(p));
                }
                MessageType::WRITE => {
                    let Request::People((i, p)) = mes.request else {
                        continue;
                    };
                    let bytes: Vec<u8> = serde_json::to_vec(&p).unwrap();
                    let key = format!("p{i}");
                    let _: () = con.set(key, &bytes).unwrap();
                    let _ = mes.response.send(Response::Ok);
                }
                MessageType::DEL => {
                    let Request::Id(i) = mes.request else {
                        continue;
                    };
                    let key = format!("p{i}");
                    let _: () = con.del(key).unwrap();
                    let _ = mes.response.send(Response::Ok);
                }
            }
        }
    });

    return s;
}

fn main() {
    let mut file = File::open(JSON_PATH).unwrap();
    let mut buf: String = String::new();
    // read json data;
    file.read_to_string(&mut buf).unwrap();

    let values: Vec<People> = serde_json::from_str(&buf).unwrap();

    // create a db connection
    let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();

    let connections = (0..NUM_THREADS)
        .map(|_i| create_connection_thread(&client))
        .collect::<Vec<_>>();

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_write(i, values[i].clone());
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for r in responses {
        r.recv().unwrap();
    }

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_read(i);
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for (i, r) in responses.into_iter().enumerate() {
        let Response::People(p) = r.recv().unwrap() else {
            panic!();
        };
        assert_eq!(&p, &values[i]);
    }

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_del(i);
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for r in responses {
        r.recv().unwrap();
    }
}

rocksdb

use serde::{Serialize, Deserialize};
use std::{fs::File, io::Read};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct People {
    name: String,
    language: String,
    bio: String,
    version: f64
}

const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json";

fn main() {
    let mut options = rocksdb::Options::default();
    options.set_error_if_exists(false);
    options.create_if_missing(true);
    options.create_missing_column_families(true);

    let db_path: &str = "./tmp";
    let cfs = rocksdb::DB::list_cf(&options, db_path).unwrap_or(vec![]);
    let cf_exists = cfs.iter().find(|cf| cf == &"cf").is_none();

    let mut instance = rocksdb::DB::open_cf(&options, db_path, cfs).unwrap();

    if cf_exists {
        let options = rocksdb::Options::default();
        instance.create_cf("cf", &options).unwrap();
    }

    let cf = instance.cf_handle("cf").unwrap();

    let mut file = File::open(JSON_PATH).unwrap();
    let mut buf: String = String::new();
    // read json data;
    file.read_to_string(&mut buf).unwrap();

    let values: Vec<People> = serde_json::from_str(&buf).unwrap();

    for i in 0..values.len() {
        let key = format!("p{i}");
        let bytes: Vec<u8> =  serde_json::to_vec(&values[i]).unwrap();
        let _: () = instance.put_cf(cf, &key,  &bytes).unwrap();
    }

    for i in 0..values.len() {
        let key = format!("p{i}");
        let bytes: Vec<u8> = instance.get_cf(cf, &key).unwrap().unwrap();
        let v: People = serde_json::from_slice(&bytes).unwrap();
        assert_eq!(&v, &values[i]);
    }

    for i in 0..values.len() {
        let key = format!("p{i}");
        let _: () = instance.delete(key).unwrap();
    }

}

multi thread

use crossbeam::channel;
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read};

const NUM_THREADS: usize = 16;
const DB_BASE_NAME: &str = "./tmp";
const CF_BASE_NAME: &str = "cf";

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct People {
    name: String,
    language: String,
    bio: String,
    version: f64,
}

enum MessageType {
    READ,
    WRITE,
    DEL,
}

enum Request {
    People((usize, People)),
    Id(usize),
}

enum Response {
    People(People),
    Ok,
}

struct Message {
    pub kind: MessageType,
    pub request: Request,
    pub response: oneshot::Sender<Response>,
}

impl Message {
    fn new(request: Request, kind: MessageType) -> (Self, oneshot::Receiver<Response>) {
        let (response, r) = oneshot::channel();
        let mes = Self {
            kind,
            response,
            request,
        };
        (mes, r)
    }

    fn new_read(id: usize) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::Id(id), MessageType::READ)
    }

    fn new_write(id: usize, people: People) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::People((id, people)), MessageType::WRITE)
    }

    fn new_del(id: usize) -> (Self, oneshot::Receiver<Response>) {
        Self::new(Request::Id(id), MessageType::DEL)
    }
}

const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json";

fn create_column_family_thread(options: &rocksdb::Options, id: usize) -> channel::Sender<Message> {
    let (s, r): (channel::Sender<Message>, channel::Receiver<Message>) = channel::unbounded();

    let cf_name = format!("{CF_BASE_NAME}{id}");
    let db_path = format!("{DB_BASE_NAME}{id}");
    let options = options.clone();

    std::thread::spawn(move || {
        let cfs = rocksdb::DB::list_cf(&options, &db_path).unwrap_or(vec![]);
        let cf_exists = cfs.iter().find(|cf| *cf == &cf_name).is_some();

        let instance = rocksdb::DB::open_cf(&options, db_path, cfs).unwrap();

        if !cf_exists {
            let options = rocksdb::Options::default();
            instance.create_cf(&cf_name, &options).unwrap();
        }

        let cf = instance.cf_handle(&cf_name).unwrap();
        while let Ok(mes) = r.recv() {
            match mes.kind {
                MessageType::READ => {
                    let Request::Id(i) = mes.request else {
                        continue;
                    };

                    let key = format!("p{i}");
                    let bytes: Vec<u8> = instance.get_cf(&cf, key).unwrap().unwrap();
                    let p: People = serde_json::from_slice(&bytes).unwrap();

                    let _ = mes.response.send(Response::People(p));
                }
                MessageType::WRITE => {
                    let Request::People((i, p)) = mes.request else {
                        continue;
                    };
                    let bytes: Vec<u8> = serde_json::to_vec(&p).unwrap();
                    let key = format!("p{i}");
                    let _: () = instance.put_cf(&cf, key, &bytes).unwrap();
                    let _ = mes.response.send(Response::Ok);
                }
                MessageType::DEL => {
                    let Request::Id(i) = mes.request else {
                        continue;
                    };
                    let key = format!("p{i}");
                    let _: () = instance.delete_cf(&cf, key).unwrap();
                    let _ = mes.response.send(Response::Ok);
                }
            }
        }
    });

    s
}

fn main() {
    let mut file = File::open(JSON_PATH).unwrap();
    let mut buf: String = String::new();
    // read json data;
    file.read_to_string(&mut buf).unwrap();

    let values: Vec<People> = serde_json::from_str(&buf).unwrap();

    let mut options = rocksdb::Options::default();
    options.set_error_if_exists(false);
    options.create_if_missing(true);
    options.set_write_buffer_size(16 << 30);
    options.create_missing_column_families(true);

    let connections = (0..NUM_THREADS)
        .map(|i| create_column_family_thread(&options, i))
        .collect::<Vec<_>>();

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_write(i, values[i].clone());
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for r in responses {
        r.recv().unwrap();
    }

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_read(i);
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for (i, r) in responses.into_iter().enumerate() {
        let Response::People(p) = r.recv().unwrap() else {
            panic!();
        };
        assert_eq!(&p, &values[i]);
    }

    let mut responses = Vec::with_capacity(values.len());

    for i in 0..values.len() {
        let (mes, r) = Message::new_del(i);
        connections[i % NUM_THREADS].send(mes).unwrap();
        responses.push(r);
    }

    for r in responses {
        r.recv().unwrap();
    }
}

column family level lock できたんだけど、db のハンドルでやっぱりボトルネックなのか遅くなっちゃった。

Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb
  Time (mean ± σ):     218.4 ms ±  18.4 ms    [User: 333.9 ms, System: 400.0 ms]
  Range (min … max):   186.2 ms … 245.3 ms    15 runs

sqlite

use serde::{Serialize, Deserialize};
use serde_json::Value;
use std::{fs::File, io::Read};
use rusqlite::{Connection, params};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct People {
    name: String,
    language: String,
    bio: String,
    version: f64
}

const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json";

fn main() {

    // let con = Connection::open_in_memory().unwrap();
    let con = Connection::open("db.sqlite3").unwrap();

    let create = "create table if not exists people(id integer primary KEY, data json)";
    con.execute(create, ()).unwrap();

    let mut file = File::open(JSON_PATH).unwrap();
    let mut buf: String = String::new();

    file.read_to_string(&mut buf).unwrap();

    let values: Vec<People> = serde_json::from_str(&buf).unwrap();

    for i in 0..values.len() {
        let string = serde_json::to_string(&values[i]).unwrap();
        let index = i + 1;
        let q = format!("insert into people (id, data) values(?1, ?2)");
        con.execute(&q, (&index, &string)).unwrap();
    }

    for i in 0..values.len() {
        let index = i + 1;
        let q = format!("select * from people where id={index}");
        let mut statement = con.prepare(&q).unwrap();

        let value: Value = statement.query_row((), |r| r.get(1)).unwrap();
        let v: People = serde_json::from_value(value).unwrap();

        assert_eq!(&v, &values[i]);
    }

    for i in 0..values.len() {
        let index = i + 1;
        con.execute("delete from people where id=?1", params![index]).unwrap();
    }

    con.execute("drop table if exists people", []).unwrap();
}

Date: 2023-09-30 Sat 17:57