diff --git a/Oars/Buffer.cs b/Oars/Buffer.cs index ca79eeb..467c112 100755 --- a/Oars/Buffer.cs +++ b/Oars/Buffer.cs @@ -6,6 +6,7 @@ namespace Oars { public sealed class Buffer : IDisposable { + bool disposed; IntPtr handle; bool ownsBuffer; @@ -22,10 +23,20 @@ public Buffer(IntPtr handle) this.handle = handle; } + void Dispose(bool disposing) + { + if (!disposed) + { + disposed = true; + + if (ownsBuffer) + evbuffer_free(handle); + } + } + public void Dispose() { - if (ownsBuffer) - evbuffer_free(handle); + Dispose(true); } public bool Add(byte[] data, int offset, int count) @@ -41,8 +52,6 @@ public int Remove(byte[] data, int offset, int count) if (offset + count > data.Length) throw new Exception("offset + count > data.Length"); - var c = new IntPtr(count); - unsafe { fixed (byte *ptr = &data[0]) return evbuffer_remove(handle, ptr, count); diff --git a/Oars/BufferEvent.cs b/Oars/BufferEvent.cs old mode 100755 new mode 100644 index 13b6026..2f4b06a --- a/Oars/BufferEvent.cs +++ b/Oars/BufferEvent.cs @@ -22,6 +22,7 @@ internal BufferEventEventArgs(BufferEventEvents events) public sealed class BufferEvent : IDisposable { + bool disposed; public EventBase EventBase { get; private set; } IntPtr bev; @@ -32,8 +33,8 @@ public sealed class BufferEvent : IDisposable public event EventHandler Write; public event EventHandler Event; - public Buffer Input { get { return input ?? (input = new Buffer(bufferevent_get_input(bev))); } } - public Buffer Output { get { return output ?? (output = new Buffer(bufferevent_get_output(bev))); } } + public Buffer Input { get { if (disposed) throw new ObjectDisposedException("Input EVBuffer"); return input ?? (input = new Buffer(bufferevent_get_input(bev))); } } + public Buffer Output { get { if (disposed) throw new ObjectDisposedException("Ouput EVBuffer"); return output ?? (output = new Buffer(bufferevent_get_output(bev))); } } int readLow, readHigh = -1; @@ -54,23 +55,51 @@ void SetReadWatermark() bufferevent_setwatermark(bev, Events.EV_READ, new IntPtr(readLow), new IntPtr(readHigh)); } - public BufferEvent(EventBase eventBase, IntPtr socket) + bufferevent_data_cb readdel; + bufferevent_data_cb writedel; + bufferevent_event_cb eventdel; + + public BufferEvent(EventBase eventBase, IntPtr socket, int timeout) { var options = (int)(BufferEventOptions.CloseOnFree | BufferEventOptions.DeferCallbacks); bev = bufferevent_socket_new(eventBase.Handle, socket, options); + var t = timeval.FromTimeSpan(TimeSpan.FromMilliseconds(timeout)); + bufferevent_set_timeouts(bev, ref t, ref t); //Console.WriteLine("bufferevent_socket_new returned " + bev.ToInt32()); // none of these can throw exceptions. - var readCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(ReadCallbackInternal)); - var writeCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(WriteCallbackInternal)); - var eventCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_event_cb(EventCallbackInternal)); + readdel = new bufferevent_data_cb(ReadCallbackInternal); + var readCb = Marshal.GetFunctionPointerForDelegate(readdel); + writedel = new bufferevent_data_cb(WriteCallbackInternal); + var writeCb = Marshal.GetFunctionPointerForDelegate(writedel); + eventdel = new bufferevent_event_cb(EventCallbackInternal); + var eventCb = Marshal.GetFunctionPointerForDelegate(eventdel); bufferevent_setcb(bev, readCb, writeCb, eventCb, IntPtr.Zero); } + void Dispose(bool disposing) + { + if (!disposed) + { + disposed = true; + if (disposing) + { + if (input != null) + input.Dispose(); + if (output != null) + output.Dispose(); + readdel = null; + writedel = null; + eventdel = null; + } + bufferevent_free(bev); + } + } + public void Dispose() { - bufferevent_free(bev); + Dispose(true); } public void Enable() @@ -125,6 +154,7 @@ void EventCallbackInternal(IntPtr bev, short what, IntPtr ctx) private delegate void bufferevent_data_cb(IntPtr bev, IntPtr ctx); private delegate void bufferevent_event_cb(IntPtr bev, short what, IntPtr ctx); + [Flags] enum BufferEventOptions { CloseOnFree = 1 << 0, @@ -132,6 +162,9 @@ enum BufferEventOptions DeferCallbacks = 1 << 2 } + [DllImport("event_core")] + static extern void bufferevent_set_timeouts(IntPtr bev, ref timeval timeoutread, ref timeval timeoutwrite); + [DllImport("event_core")] static extern IntPtr bufferevent_socket_new(IntPtr event_base, IntPtr socket, int options); diff --git a/Oars/ConnectionListener.cs b/Oars/ConnectionListener.cs index 37ca309..e29c78c 100755 --- a/Oars/ConnectionListener.cs +++ b/Oars/ConnectionListener.cs @@ -5,9 +5,21 @@ namespace Oars { + public sealed class ConnectionAcceptedEventArgs : EventArgs + { + public IntPtr Socket { get; private set; } + public IPEndPoint RemoteEndPoint { get; private set; } + + internal ConnectionAcceptedEventArgs(IntPtr socket, IPEndPoint remoteEndPoint) + { + Socket = socket; + RemoteEndPoint = remoteEndPoint; + } + } + public sealed class ConnectionListener : IDisposable { - public Action ConnectionAccepted; + public event EventHandler ConnectionAccepted; public EventBase Base { get; set; } public IPEndPoint ListenEndPoint { get; private set; } @@ -52,17 +64,19 @@ public void Disable() disabled = true; } - void ConnectionCallback(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx) + void ConnectionCallback(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx) { try { - if (ConnectionAccepted != null) - ConnectionAccepted(socket, address.ToIPEndPoint()); + if (ConnectionAccepted != null){ + sockaddr_in sockAddrIn = (sockaddr_in)Marshal.PtrToStructure(address, typeof(sockaddr_in)); + ConnectionAccepted(this, new ConnectionAcceptedEventArgs(socket, + new IPEndPoint(sockAddrIn.sin_addr.s_addr, (ushort)IPAddress.NetworkToHostOrder((short)sockAddrIn.sin_port)))); + } } - catch (Exception e) + catch (Exception) { Debug.WriteLine("Exception during connection listener callback."); - //Extensions.HandleException("EVConnListener callback", e); } } @@ -77,7 +91,7 @@ enum ConnectionListenerOptions } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx); + private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx); [DllImport("event_core")] private unsafe static extern IntPtr evconnlistener_new_bind(IntPtr event_base, IntPtr cb, IntPtr ctx, short flags, short backlog, ref sockaddr_in sa, short socklen); diff --git a/Oars/Event.cs b/Oars/Event.cs index b97ccde..610e4ad 100755 --- a/Oars/Event.cs +++ b/Oars/Event.cs @@ -4,6 +4,7 @@ namespace Oars { + [Flags] public enum Events : short { None = 0, @@ -17,14 +18,14 @@ public enum Events : short public sealed class Event : IDisposable { - public IntPtr Socket { get; private set; } + IntPtr fd; + IntPtr fp; public IntPtr Handle { get; private set; } public Events Events { get; private set; } - public Action Activated; + public event EventHandler Activated; bool pending; Delegate cb; - IntPtr fp; public static Event CreateTimer(EventBase eventBase) { @@ -38,17 +39,17 @@ public static Event CreateTimer(EventBase eventBase, bool persist) public Event(EventBase eventBase, IntPtr fd, Events what) { - Socket = fd; + this.fd = fd; cb = Delegate.CreateDelegate(typeof(event_callback_fn), this, "EventCallbackInternal"); fp = Marshal.GetFunctionPointerForDelegate(cb); - Debug.Write("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); + Debug.WriteLine("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); Handle = event_new(eventBase.Handle, fd, (short)what, fp, IntPtr.Zero); } public void Dispose() { - Debug.Write("EVEvent disposed with fd " + Socket.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); + Debug.WriteLine("EVEvent disposed with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x")); ThrowIfDisposed(); if (pending) @@ -95,9 +96,9 @@ void EventCallbackInternal(IntPtr fd, short what, IntPtr ctx) { Debug.WriteLine("Event on fd {0} activated with events {1}.", fd.ToInt32(), Events); if (Activated != null) - Activated(); + Activated(this, EventArgs.Empty); } - catch (Exception e) + catch (Exception) { Debug.WriteLine("Exception during event callback."); } diff --git a/Oars/EventBase.cs b/Oars/EventBase.cs index 96c7571..dd3afba 100755 --- a/Oars/EventBase.cs +++ b/Oars/EventBase.cs @@ -6,7 +6,8 @@ namespace Oars public enum LoopOptions : int { Once = 0x01, - NonBlock = 0x02 + NonBlock = 0x02, + NoExitOnEmpy = 0x04 } public sealed class EventBase : IDisposable diff --git a/Oars/EventSyncContext.cs b/Oars/EventSyncContext.cs new file mode 100644 index 0000000..3ca6b32 --- /dev/null +++ b/Oars/EventSyncContext.cs @@ -0,0 +1,43 @@ +using System.Threading; +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace Oars +{ + public sealed class EventSyncContext : SynchronizationContext + { + readonly BlockingCollection> m_queue = + new BlockingCollection>(); + + EventBase eventBase = new EventBase(); + + public EventBase EventBase + { + get + { + return eventBase; + } + } + + public override void Post(SendOrPostCallback d, object state) + { + m_queue.Add(new KeyValuePair(d, state)); + } + + public void RunOnCurrentThread() + { + KeyValuePair workItem; + while (true) + { + while (m_queue.TryTake(out workItem, 1)) + workItem.Key(workItem.Value); + eventBase.Loop(LoopOptions.NonBlock); + } + } + + public void Complete() { + m_queue.CompleteAdding(); + } + } +} + diff --git a/Oars/Interop.cs b/Oars/Interop.cs index 71fb8e2..fa62b7b 100755 --- a/Oars/Interop.cs +++ b/Oars/Interop.cs @@ -5,7 +5,7 @@ namespace Oars { - internal struct timeval + struct timeval { public int tv_sec; public int tv_usec; @@ -71,7 +71,7 @@ public IPEndPoint ToIPEndPoint() [StructLayout(LayoutKind.Sequential)] struct in_addr { - public long s_addr; + public uint s_addr; } static class OperatingSystem @@ -125,38 +125,15 @@ static bool IsRunningOnMac() } } - public static class FDExtensions + static class FDExtensions { + [DllImport("libc")] + static extern int close(IntPtr fd); + public static int Close(this IntPtr fd) { return close(fd); } - - public static int Recv(this IntPtr fd, ArraySegment buffer, int flags) - { - unsafe - { - fixed (byte* ptr = &(buffer.Array[buffer.Offset])) - return recv(fd, ptr, buffer.Count, flags); - } - } - - public static int Send(this IntPtr fd, ArraySegment buffer, int flags) - { - unsafe { - fixed (byte *ptr = &(buffer.Array[buffer.Offset])) - return send(fd, ptr, buffer.Count, flags); - } - } - - [DllImport("libc")] - static extern int close(IntPtr fd); - - [DllImport("libc")] - static unsafe extern int send(IntPtr fd, byte* buffer, int length, int flags); - - [DllImport("libc")] - static unsafe extern int recv(IntPtr fd, byte* buffer, int length, int flags); } // lifted this from Mono.Unix/Stdlib.cs