Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nginx_module/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
name = "nginx_module"
readme = "README.md"
repository = "https://github.com/g-Core/nginx-rust"
version = "0.1.4"
version = "0.1.5"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
164 changes: 104 additions & 60 deletions nginx_module/src/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use libc::{c_void, sockaddr_un};
use std::{
borrow::Cow, cell::RefCell, collections::VecDeque, marker::PhantomPinned, mem::MaybeUninit,
os::unix::ffi::OsStrExt, path::Path, pin::Pin,
};
use libc::{c_void, sockaddr_un};

use crate::log::ngx_log_error;
use crate::{
bindings::{
ngx_close_connection, ngx_connection_t, ngx_cycle, ngx_event_actions,
ngx_event_connect_peer, ngx_event_t, ngx_exiting, ngx_handle_read_event,
ngx_handle_write_event, ngx_log_t, ngx_peer_connection_t, ngx_quit, ngx_terminate,
NGX_AGAIN, NGX_RS_WRITE_EVENT,
NGX_AGAIN, NGX_LOG_ERR, NGX_RS_WRITE_EVENT,
},
ngx_event_add_timer, ngx_event_del_timer, NgxStr, NGX_OK,
};
Expand All @@ -27,7 +28,7 @@ enum State {
},
Disconnected {
event: Box<ngx_event_t>,
reconnect_timeout: usize
reconnect_timeout: usize,
},
}

Expand Down Expand Up @@ -90,22 +91,25 @@ impl UnixSocket {
{
ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS);
}
State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS }
State::Disconnected {
event: ev,
reconnect_timeout: MIN_TIMEOUT_MS,
}
},
}
}
None => unsafe {
let mut ev: Box<ngx_event_t> = Box::new(MaybeUninit::zeroed().assume_init());
ev.handler = Some(on_reconnect_timeout);
ev.log = (*ngx_cycle).log;
if (*ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0
{
ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS);
}
State::Disconnected { event: ev, reconnect_timeout: MIN_TIMEOUT_MS }
State::Disconnected {
event: ev,
reconnect_timeout: MIN_TIMEOUT_MS,
}
},
};

Expand Down Expand Up @@ -152,7 +156,10 @@ impl UnixSocket {
let mut dummy_ev: Box<ngx_event_t> = Box::new(MaybeUninit::zeroed().assume_init());
dummy_ev.handler = None;
dummy_ev.log = (*ngx_cycle).log;
*self.0.state.borrow_mut() = State::Disconnected { event: dummy_ev, reconnect_timeout: MIN_TIMEOUT_MS}
*self.0.state.borrow_mut() = State::Disconnected {
event: dummy_ev,
reconnect_timeout: MIN_TIMEOUT_MS,
}
}
}
}
Expand Down Expand Up @@ -286,12 +293,22 @@ impl Inner {

// Unsafe - self should come from a Pinned address
unsafe fn send(&self) {
let state = &mut *self.state.borrow_mut();
if let State::Connected { conn, buffers } = state {
if let Err(Disconnected) = unsafe { buffers.send(*conn) } {
// Use try_borrow_mut to avoid panic on re-entrant calls
let mut state = match self.state.try_borrow_mut() {
Ok(state) => state,
Err(error) => {
if let Ok(err_msg) = std::ffi::CString::new(format!("send: {}", error)) {
ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg);
}
return;
} // Already borrowed, skip this send attempt
};

if let State::Connected { conn, buffers } = &mut *state {
if let Err(Disconnected) = buffers.send(*conn) {
Comment thread
ruslanti marked this conversation as resolved.
*state = State::Disconnected {
event: self.create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
}
Expand All @@ -304,12 +321,8 @@ impl Inner {
ev.log = (*ngx_cycle).log;
ev.data = (self as *const Self as *mut Self).cast();

if (*ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
{
ngx_event_add_timer(&mut *ev, 0);
if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0 {
ngx_event_add_timer(&mut *ev, MIN_TIMEOUT_MS);
}

ev
Expand Down Expand Up @@ -341,7 +354,8 @@ impl WriteBuffers {
return Err(Disconnected);
}
BufferSendResult::Again => {
if ngx_quit == 0
if (*(*conn).write).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
{
Expand Down Expand Up @@ -557,50 +571,80 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
}

unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) {
// Add null pointer checks
if ev.is_null() {
return;
}

let data = (*ev).data as *const Inner;
if !data.is_null() {
let data = &*data;
let mut state = data.state.borrow_mut();
if let State::Disconnected { reconnect_timeout, .. } = &mut *state {
if *reconnect_timeout < TIMEOUT_MS {
*reconnect_timeout = (*reconnect_timeout) * 2
};
if let Some(conn) = State::try_connect(
&data.path,
&data.name,
&*data.dummy_log as *const ngx_log_t as *mut ngx_log_t,
) {
(*conn).data = (*ev).data;
(*(*conn).write).handler = Some(on_write);
(*(*conn).read).handler = Some(on_read);
if data.is_null() {
return;
}

let mut buffers = WriteBuffers::default();
buffers.push(&data.handshake_msg);
*state = match unsafe { buffers.send(conn) } {
Ok(()) => State::WaitServerHandshake { conn, buffers },
Err(_) => unsafe {
let mut ev: Box<ngx_event_t> =
Box::new(MaybeUninit::zeroed().assume_init());
ev.handler = Some(on_reconnect_timeout);
ev.log = (*ngx_cycle).log;
if (*ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
{
ngx_event_add_timer(&mut *ev, *reconnect_timeout);
}
let data = &*data;

State::Disconnected { event: ev, reconnect_timeout: *reconnect_timeout }
},
};
} else if (*ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
// Use try_borrow_mut to avoid panic if already borrowed
let mut state = match data.state.try_borrow_mut() {
Ok(state) => state,
Err(error) => {
if let Ok(err_msg) = std::ffi::CString::new(format!("on_reconnect_timeout: {}", error))
{
ngx_event_add_timer(ev, *reconnect_timeout);
ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg);
}
// State is already borrowed, schedule retry
if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0 {
ngx_event_add_timer(ev, MIN_TIMEOUT_MS);
}
return;
}
};

if let State::Disconnected {
reconnect_timeout, ..
} = &mut *state
{
if *reconnect_timeout < TIMEOUT_MS {
*reconnect_timeout = (*reconnect_timeout) * 2;
}

if let Some(conn) = State::try_connect(
&data.path,
&data.name,
&*data.dummy_log as *const ngx_log_t as *mut ngx_log_t,
) {
(*conn).data = (*ev).data;
(*(*conn).write).handler = Some(on_write);
(*(*conn).read).handler = Some(on_read);

let mut buffers = WriteBuffers::default();
buffers.push(&data.handshake_msg);
*state = match buffers.send(conn) {
Comment thread
ruslanti marked this conversation as resolved.
Ok(()) => State::WaitServerHandshake { conn, buffers },
Err(_) => {
let mut new_ev: Box<ngx_event_t> =
Box::new(MaybeUninit::zeroed().assume_init());
Comment thread
ruslanti marked this conversation as resolved.
new_ev.handler = Some(on_reconnect_timeout);
new_ev.log = (*ngx_cycle).log;
new_ev.data = (*ev).data;

if (*new_ev).timer_set() == 0
&& ngx_quit == 0
&& ngx_exiting == 0
&& ngx_terminate == 0
{
ngx_event_add_timer(&mut *new_ev, *reconnect_timeout);
}

State::Disconnected {
event: new_ev,
reconnect_timeout: *reconnect_timeout,
}
}
};
} else if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0
{
(*ev).handler = Some(on_reconnect_timeout);
ngx_event_add_timer(ev, *reconnect_timeout);
}
}
}
Expand Down