Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,095 changes: 328 additions & 767 deletions crates/ember-core/src/shard/mod.rs

Large diffs are not rendered by default.

1,412 changes: 188 additions & 1,224 deletions crates/ember-protocol/src/command/table.rs

Large diffs are not rendered by default.

126 changes: 36 additions & 90 deletions crates/ember-server/src/connection/exec/hashes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Hash command handlers.

use bytes::Bytes;
use ember_core::{ShardRequest, ShardResponse, Value};
use ember_core::{ShardRequest, ShardResponse};
use ember_protocol::Frame;

use super::ExecCtx;
Expand All @@ -12,50 +12,37 @@ pub(in crate::connection) async fn hset(
cx: &ExecCtx<'_>,
) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HSet {
key: key.clone(),
fields,
};
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Len(n)) => {
cx.notify_write(crate::keyspace_notifications::FLAG_H, "hset", &key);
Frame::Integer(n as i64)
}
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(ShardResponse::OutOfMemory) => super::oom_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
let notify_key = key.clone();
let req = ShardRequest::HSet { key, fields };
super::route_to_shard(cx, idx, req, |resp| {
let frame = super::resp_len(resp);
cx.notify_write(crate::keyspace_notifications::FLAG_H, "hset", &notify_key);
frame
})
.await
}

pub(in crate::connection) async fn hget(key: String, field: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HGet { key, field };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Value(Some(Value::String(data)))) => Frame::Bulk(data),
Ok(ShardResponse::Value(None)) => Frame::Null,
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_string_value).await
}

pub(in crate::connection) async fn hgetall(key: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HGetAll { key };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::HashFields(fields)) => {
super::route_to_shard(cx, idx, req, |resp| match resp {
ShardResponse::HashFields(fields) => {
let mut frames = Vec::with_capacity(fields.len() * 2);
for (field, value) in fields {
frames.push(Frame::Bulk(Bytes::from(field)));
frames.push(Frame::Bulk(value));
}
Frame::Array(frames)
}
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
other => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
})
.await
}

pub(in crate::connection) async fn hdel(
Expand All @@ -65,34 +52,23 @@ pub(in crate::connection) async fn hdel(
) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HDel { key, fields };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::HDelLen { count, .. }) => Frame::Integer(count as i64),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, |resp| match resp {
ShardResponse::HDelLen { count, .. } => Frame::Integer(count as i64),
other => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
})
.await
}

pub(in crate::connection) async fn hexists(key: String, field: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HExists { key, field };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Bool(b)) => Frame::Integer(if b { 1 } else { 0 }),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_bool_int).await
}

pub(in crate::connection) async fn hlen(key: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HLen { key };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Len(n)) => Frame::Integer(n as i64),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_len).await
}

pub(in crate::connection) async fn hincrby(
Expand All @@ -103,14 +79,7 @@ pub(in crate::connection) async fn hincrby(
) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HIncrBy { key, field, delta };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Integer(n)) => Frame::Integer(n),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(ShardResponse::OutOfMemory) => super::oom_error(),
Ok(ShardResponse::Err(msg)) => Frame::Error(format!("ERR {msg}")),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_integer).await
}

pub(in crate::connection) async fn hincrbyfloat(
Expand All @@ -121,40 +90,19 @@ pub(in crate::connection) async fn hincrbyfloat(
) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HIncrByFloat { key, field, delta };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::BulkString(val)) => Frame::Bulk(Bytes::from(val)),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(ShardResponse::OutOfMemory) => super::oom_error(),
Ok(ShardResponse::Err(msg)) => Frame::Error(msg),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_bulk_string).await
}

pub(in crate::connection) async fn hkeys(key: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HKeys { key };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::StringArray(keys)) => Frame::Array(
keys.into_iter()
.map(|k| Frame::Bulk(Bytes::from(k)))
.collect(),
),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_string_array).await
}

pub(in crate::connection) async fn hvals(key: String, cx: &ExecCtx<'_>) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HVals { key };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::Array(vals)) => Frame::Array(vals.into_iter().map(Frame::Bulk).collect()),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
super::route_to_shard(cx, idx, req, super::resp_bulk_array).await
}

pub(in crate::connection) async fn hmget(
Expand All @@ -164,19 +112,18 @@ pub(in crate::connection) async fn hmget(
) -> Frame {
let idx = cx.engine.shard_for_key(&key);
let req = ShardRequest::HMGet { key, fields };
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::OptionalArray(vals)) => Frame::Array(
super::route_to_shard(cx, idx, req, |resp| match resp {
ShardResponse::OptionalArray(vals) => Frame::Array(
vals.into_iter()
.map(|v| match v {
Some(data) => Frame::Bulk(data),
None => Frame::Null,
})
.collect(),
),
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
other => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
})
.await
}

pub(in crate::connection) async fn hrandfield(
Expand All @@ -191,8 +138,8 @@ pub(in crate::connection) async fn hrandfield(
count,
with_values,
};
match cx.engine.send_to_shard(idx, req).await {
Ok(ShardResponse::HRandFieldResult(pairs)) => {
super::route_to_shard(cx, idx, req, |resp| match resp {
ShardResponse::HRandFieldResult(pairs) => {
if count.is_none() {
// no count: return a single bulk string (or nil if empty)
match pairs.into_iter().next() {
Expand All @@ -214,10 +161,9 @@ pub(in crate::connection) async fn hrandfield(
Frame::Array(frames)
}
}
Ok(ShardResponse::WrongType) => super::wrongtype_error(),
Ok(other) => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
Err(e) => Frame::Error(format!("ERR {e}")),
}
other => Frame::Error(format!("ERR unexpected shard response: {other:?}")),
})
.await
}

pub(in crate::connection) async fn hscan(
Expand Down
Loading