Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/LokiKV.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ path = "src/cli/main.rs"

[dependencies]
bincode = "1.3.3"
bit-set = "0.8.0"
clap = { version = "4.5.17", features = ["derive"] }
pest = "2.6"
pest_derive = "2.6"
rayon = "1.10.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shlex = "1.3.0"
tokio = { version = "1.41.1", features = ["full"] }
pest = "2.6"
pest_derive = "2.6"
bit-set = "0.8.0"
10 changes: 9 additions & 1 deletion src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,18 @@ fn main() {
}

// println!("Writing to stream: {}", buf);
if let Err(e) = writer.write_all(buf.as_bytes()) {
let json_command = format!(r#"{{"query": "{}"}}"#, buf.trim());

if let Err(e) = writer.write_all(json_command.as_bytes()) {
eprintln!("Failed to send command: {}", e);
break;
}

// Ensure a newline is sent for proper deserialization
if let Err(e) = writer.write_all(b"\n") {
eprintln!("Failed to send newline: {}", e);
break;
}
// println!("Written to stream!");

// println!("Checking response....");
Expand Down
11 changes: 11 additions & 0 deletions src/db/server_multithread/deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::Deserialize;
use serde_json::{self, Result};

#[derive(Debug, Deserialize)]
pub struct Request {
pub query: String,
}

pub fn deserialize(input: &str) -> Result<Request> {
serde_json::from_str(input)
}
2 changes: 2 additions & 0 deletions src/db/server_multithread/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod server;
pub mod serializer;
pub mod deserializer;
6 changes: 6 additions & 0 deletions src/db/server_multithread/serializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use serde::Serialize;
use serde_json;

pub fn serialize<T: Serialize>(data: &T) -> Result<String, String> {
serde_json::to_string(data).map_err(|e| format!("Serialization error: {}", e))
}
98 changes: 53 additions & 45 deletions src/db/server_multithread/server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
use crate::loki_kv::loki_kv::{LokiKV, ValueObject};
use crate::loki_kv::loki_kv::LokiKV;
use crate::parser::executor::Executor;
use crate::parser::parser::parse_lokiql;
use crate::server_multithread::serializer::serialize;
use crate::server_multithread::deserializer::deserialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
ops::{Deref, DerefMut},
sync::{Arc, RwLock},
};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use tokio::io::{AsyncBufReadExt, BufReader, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

#[derive(Debug, Serialize, Deserialize)]
pub struct Request {
query: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Response {
result: Vec<String>,
}

// Server Logic
pub struct LokiServer {
Expand All @@ -19,7 +29,7 @@ pub struct LokiServer {
thread_count: usize,
db_instance: Arc<RwLock<LokiKV>>,
}
//

async fn handle_connection(
stream: TcpStream,
db_instance: Arc<RwLock<LokiKV>>,
Expand All @@ -38,59 +48,57 @@ async fn handle_connection(
}

let request_line = buf.trim().to_string();
// let request_line = String::from_utf8(buf[..n].to_vec())
// .map_err(|e| format!("Invalid UTF-8 data: {}", e))
// .unwrap();

println!("Got {:?}", request_line);

let asts = parse_lokiql(&request_line);
let mut ast_exector = Executor::new(db_instance.clone(), asts);
let responses = ast_exector.execute();
// Fix deserialization
match deserialize(&request_line) {
Ok(query) => {
println!("Executing query: {:?}", query);

let mut resp_str = String::new();
// Improve output result
for response in responses.iter() {
if let val = response {
resp_str += &format!("{:?}\n", val);
};
}
let asts = parse_lokiql(&query.query);
let mut ast_exector = Executor::new(db_instance.clone(), asts);
let responses = ast_exector.execute();

resp_str += "<END_OF_RESPONSE>\n";
println!("RESPONSE: {}", resp_str);
let _ = wr.write_all(resp_str.as_bytes()).await;
let _ = wr.flush().await;
// println!("Wrote bytes {}", resp_str);
let mut resp_str = String::new();
for response in responses.iter() {
if let val = response {
resp_str += &format!("{:?}\n", val);
};
}

resp_str += "<END_OF_RESPONSE>\n";
println!("RESPONSE: {}", resp_str);
let _ = wr.write_all(resp_str.as_bytes()).await;
let _ = wr.flush().await;
}
Err(e) => {
eprintln!("Deserialization error: {}", e);
let _ = wr.write_all(b"Deserialization error\n").await;
let _ = wr.flush().await;
}
}
}
}

impl LokiServer {
pub async fn new(host: String, port: u16, thread_count: usize) -> Self {
let addr = format!("{}:{}", host, port);
println!("Trying to start server at -> {}", addr);
let tcp_listener = TcpListener::bind(addr).await;
let tcp_listener = TcpListener::bind(addr).await.expect("Unable to create server");

match tcp_listener {
Ok(tcp_list) => {
println!("Started Sevrer at {}:{}", host, port);
let db_instance = LokiKV::new();
LokiServer {
tcp_listener: tcp_list,
host,
port,
thread_count,
db_instance: Arc::new(RwLock::new(db_instance)),
}
}
Err(_) => {
panic!("Unable to Create new Server at {}:{}", host, port);
}
println!("Started Server at {}:{}", host, port);
let db_instance = LokiKV::new();
LokiServer {
tcp_listener,
host,
port,
thread_count,
db_instance: Arc::new(RwLock::new(db_instance)),
}
}

pub async fn start_event_loop(&mut self) {
loop {
// println!("HII!!");
match self.tcp_listener.accept().await {
Ok((socket, _)) => {
let db = self.db_instance.clone();
Expand All @@ -101,7 +109,7 @@ impl LokiServer {
}
});
}
_ => panic!("error accepting connection"),
Err(e) => eprintln!("Error accepting connection: {}", e),
};
}
}
Expand Down