Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions Oars/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Oars
{
public sealed class Buffer : IDisposable
{
bool disposed;
IntPtr handle;
bool ownsBuffer;

Expand All @@ -22,10 +23,20 @@ public Buffer(IntPtr handle)
this.handle = handle;
}

void Dispose(bool disposing)
{
if (!disposed)
{
disposed = true;

if (ownsBuffer)
evbuffer_free(handle);
}
}

public void Dispose()
{
if (ownsBuffer)
evbuffer_free(handle);
Dispose(true);
}

public bool Add(byte[] data, int offset, int count)
Expand All @@ -41,8 +52,6 @@ public int Remove(byte[] data, int offset, int count)
if (offset + count > data.Length)
throw new Exception("offset + count > data.Length");

var c = new IntPtr(count);

unsafe {
fixed (byte *ptr = &data[0])
return evbuffer_remove(handle, ptr, count);
Expand Down
47 changes: 40 additions & 7 deletions Oars/BufferEvent.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal BufferEventEventArgs(BufferEventEvents events)

public sealed class BufferEvent : IDisposable
{
bool disposed;
public EventBase EventBase { get; private set; }
IntPtr bev;

Expand All @@ -32,8 +33,8 @@ public sealed class BufferEvent : IDisposable
public event EventHandler Write;
public event EventHandler<BufferEventEventArgs> Event;

public Buffer Input { get { return input ?? (input = new Buffer(bufferevent_get_input(bev))); } }
public Buffer Output { get { return output ?? (output = new Buffer(bufferevent_get_output(bev))); } }
public Buffer Input { get { if (disposed) throw new ObjectDisposedException("Input EVBuffer"); return input ?? (input = new Buffer(bufferevent_get_input(bev))); } }
public Buffer Output { get { if (disposed) throw new ObjectDisposedException("Ouput EVBuffer"); return output ?? (output = new Buffer(bufferevent_get_output(bev))); } }

int readLow, readHigh = -1;

Expand All @@ -54,23 +55,51 @@ void SetReadWatermark()
bufferevent_setwatermark(bev, Events.EV_READ, new IntPtr(readLow), new IntPtr(readHigh));
}

public BufferEvent(EventBase eventBase, IntPtr socket)
bufferevent_data_cb readdel;
bufferevent_data_cb writedel;
bufferevent_event_cb eventdel;

public BufferEvent(EventBase eventBase, IntPtr socket, int timeout)
{
var options = (int)(BufferEventOptions.CloseOnFree | BufferEventOptions.DeferCallbacks);
bev = bufferevent_socket_new(eventBase.Handle, socket, options);
var t = timeval.FromTimeSpan(TimeSpan.FromMilliseconds(timeout));
bufferevent_set_timeouts(bev, ref t, ref t);
//Console.WriteLine("bufferevent_socket_new returned " + bev.ToInt32());

// none of these can throw exceptions.
var readCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(ReadCallbackInternal));
var writeCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_data_cb(WriteCallbackInternal));
var eventCb = Marshal.GetFunctionPointerForDelegate(new bufferevent_event_cb(EventCallbackInternal));
readdel = new bufferevent_data_cb(ReadCallbackInternal);
var readCb = Marshal.GetFunctionPointerForDelegate(readdel);
writedel = new bufferevent_data_cb(WriteCallbackInternal);
var writeCb = Marshal.GetFunctionPointerForDelegate(writedel);
eventdel = new bufferevent_event_cb(EventCallbackInternal);
var eventCb = Marshal.GetFunctionPointerForDelegate(eventdel);

bufferevent_setcb(bev, readCb, writeCb, eventCb, IntPtr.Zero);
}

void Dispose(bool disposing)
{
if (!disposed)
{
disposed = true;
if (disposing)
{
if (input != null)
input.Dispose();
if (output != null)
output.Dispose();
readdel = null;
writedel = null;
eventdel = null;
}
bufferevent_free(bev);
}
}

public void Dispose()
{
bufferevent_free(bev);
Dispose(true);
}

public void Enable()
Expand Down Expand Up @@ -125,13 +154,17 @@ void EventCallbackInternal(IntPtr bev, short what, IntPtr ctx)
private delegate void bufferevent_data_cb(IntPtr bev, IntPtr ctx);
private delegate void bufferevent_event_cb(IntPtr bev, short what, IntPtr ctx);

[Flags]
enum BufferEventOptions
{
CloseOnFree = 1 << 0,
ThreadSafe = 1 << 1,
DeferCallbacks = 1 << 2
}

[DllImport("event_core")]
static extern void bufferevent_set_timeouts(IntPtr bev, ref timeval timeoutread, ref timeval timeoutwrite);

[DllImport("event_core")]
static extern IntPtr bufferevent_socket_new(IntPtr event_base, IntPtr socket, int options);

Expand Down
28 changes: 21 additions & 7 deletions Oars/ConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@

namespace Oars
{
public sealed class ConnectionAcceptedEventArgs : EventArgs
{
public IntPtr Socket { get; private set; }
public IPEndPoint RemoteEndPoint { get; private set; }

internal ConnectionAcceptedEventArgs(IntPtr socket, IPEndPoint remoteEndPoint)
{
Socket = socket;
RemoteEndPoint = remoteEndPoint;
}
}

public sealed class ConnectionListener : IDisposable
{
public Action<IntPtr, IPEndPoint> ConnectionAccepted;
public event EventHandler<ConnectionAcceptedEventArgs> ConnectionAccepted;
public EventBase Base { get; set; }
public IPEndPoint ListenEndPoint { get; private set; }

Expand Down Expand Up @@ -52,17 +64,19 @@ public void Disable()
disabled = true;
}

void ConnectionCallback(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx)
void ConnectionCallback(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx)
{
try
{
if (ConnectionAccepted != null)
ConnectionAccepted(socket, address.ToIPEndPoint());
if (ConnectionAccepted != null){
sockaddr_in sockAddrIn = (sockaddr_in)Marshal.PtrToStructure(address, typeof(sockaddr_in));
ConnectionAccepted(this, new ConnectionAcceptedEventArgs(socket,
new IPEndPoint(sockAddrIn.sin_addr.s_addr, (ushort)IPAddress.NetworkToHostOrder((short)sockAddrIn.sin_port))));
}
}
catch (Exception e)
catch (Exception)
{
Debug.WriteLine("Exception during connection listener callback.");
//Extensions.HandleException("EVConnListener callback", e);
}
}

Expand All @@ -77,7 +91,7 @@ enum ConnectionListenerOptions
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, sockaddr_in address, int socklen, IntPtr ctx);
private delegate void evconnlistener_cb(IntPtr listener, IntPtr socket, IntPtr address, int socklen, IntPtr ctx);

[DllImport("event_core")]
private unsafe static extern IntPtr evconnlistener_new_bind(IntPtr event_base, IntPtr cb, IntPtr ctx, short flags, short backlog, ref sockaddr_in sa, short socklen);
Expand Down
17 changes: 9 additions & 8 deletions Oars/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Oars
{
[Flags]
public enum Events : short
{
None = 0,
Expand All @@ -17,14 +18,14 @@ public enum Events : short

public sealed class Event : IDisposable
{
public IntPtr Socket { get; private set; }
IntPtr fd;
IntPtr fp;
public IntPtr Handle { get; private set; }
public Events Events { get; private set; }

public Action Activated;
public event EventHandler Activated;
bool pending;
Delegate cb;
IntPtr fp;

public static Event CreateTimer(EventBase eventBase)
{
Expand All @@ -38,17 +39,17 @@ public static Event CreateTimer(EventBase eventBase, bool persist)

public Event(EventBase eventBase, IntPtr fd, Events what)
{
Socket = fd;
this.fd = fd;
cb = Delegate.CreateDelegate(typeof(event_callback_fn), this, "EventCallbackInternal");

fp = Marshal.GetFunctionPointerForDelegate(cb);
Debug.Write("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x"));
Debug.WriteLine("EVEvent created with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x"));
Handle = event_new(eventBase.Handle, fd, (short)what, fp, IntPtr.Zero);
}

public void Dispose()
{
Debug.Write("EVEvent disposed with fd " + Socket.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x"));
Debug.WriteLine("EVEvent disposed with fd " + fd.ToInt32().ToString("x") + ", cb " + fp.ToInt32().ToString("x"));
ThrowIfDisposed();

if (pending)
Expand Down Expand Up @@ -95,9 +96,9 @@ void EventCallbackInternal(IntPtr fd, short what, IntPtr ctx)
{
Debug.WriteLine("Event on fd {0} activated with events {1}.", fd.ToInt32(), Events);
if (Activated != null)
Activated();
Activated(this, EventArgs.Empty);
}
catch (Exception e)
catch (Exception)
{
Debug.WriteLine("Exception during event callback.");
}
Expand Down
3 changes: 2 additions & 1 deletion Oars/EventBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ namespace Oars
public enum LoopOptions : int
{
Once = 0x01,
NonBlock = 0x02
NonBlock = 0x02,
NoExitOnEmpy = 0x04
}

public sealed class EventBase : IDisposable
Expand Down
43 changes: 43 additions & 0 deletions Oars/EventSyncContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Threading;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Oars
{
public sealed class EventSyncContext : SynchronizationContext
{
readonly BlockingCollection<KeyValuePair<SendOrPostCallback,object>> m_queue =
new BlockingCollection<KeyValuePair<SendOrPostCallback,object>>();

EventBase eventBase = new EventBase();

public EventBase EventBase
{
get
{
return eventBase;
}
}

public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(new KeyValuePair<SendOrPostCallback,object>(d, state));
}

public void RunOnCurrentThread()
{
KeyValuePair<SendOrPostCallback, object> workItem;
while (true)
{
while (m_queue.TryTake(out workItem, 1))
workItem.Key(workItem.Value);
eventBase.Loop(LoopOptions.NonBlock);
}
}

public void Complete() {
m_queue.CompleteAdding();
}
}
}

35 changes: 6 additions & 29 deletions Oars/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Oars
{
internal struct timeval
struct timeval
{
public int tv_sec;
public int tv_usec;
Expand Down Expand Up @@ -71,7 +71,7 @@ public IPEndPoint ToIPEndPoint()
[StructLayout(LayoutKind.Sequential)]
struct in_addr
{
public long s_addr;
public uint s_addr;
}

static class OperatingSystem
Expand Down Expand Up @@ -125,38 +125,15 @@ static bool IsRunningOnMac()
}
}

public static class FDExtensions
static class FDExtensions
{
[DllImport("libc")]
static extern int close(IntPtr fd);

public static int Close(this IntPtr fd)
{
return close(fd);
}

public static int Recv(this IntPtr fd, ArraySegment<byte> buffer, int flags)
{
unsafe
{
fixed (byte* ptr = &(buffer.Array[buffer.Offset]))
return recv(fd, ptr, buffer.Count, flags);
}
}

public static int Send(this IntPtr fd, ArraySegment<byte> buffer, int flags)
{
unsafe {
fixed (byte *ptr = &(buffer.Array[buffer.Offset]))
return send(fd, ptr, buffer.Count, flags);
}
}

[DllImport("libc")]
static extern int close(IntPtr fd);

[DllImport("libc")]
static unsafe extern int send(IntPtr fd, byte* buffer, int length, int flags);

[DllImport("libc")]
static unsafe extern int recv(IntPtr fd, byte* buffer, int length, int flags);
}

// lifted this from Mono.Unix/Stdlib.cs
Expand Down