Skip to content

dwerner/core-executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

core-executor

A custom async executor for Rust with CPU affinity and NUMA awareness support, designed for CPU-heavy workloads.

Features

  • Thread Pool Executor: Manages a pool of thread-affine executors, one per core
  • CPU Affinity: Pin tasks to specific CPU cores for better cache locality
  • NUMA Awareness: Detect and utilize NUMA topology for optimal memory access patterns
  • Bidirectional Channels: Custom channel implementations (Bichannel, Hookshot)
  • Task Cancellation: Graceful shutdown support with TaskWithShutdown

Overview

This executor is designed to efficiently handle CPU-intensive async tasks by:

  • Pinning threads to specific CPU cores
  • Allowing explicit task placement on specific cores or NUMA nodes
  • Supporting round-robin task distribution (globally or per NUMA node)
  • Minimizing cross-NUMA memory access for better performance

Usage

Basic Usage

use core_executor::ThreadPoolExecutor;

// Create an executor with 8 cores
let mut executor = ThreadPoolExecutor::new(8);

// Spawn a task on a specific core
let future = executor.spawn_on_core(0, async {
    // CPU-intensive work here
    42
});

// Spawn a task on any available core (round-robin)
let future = executor.spawn_on_any(async {
    // Work that can run on any core
    "result"
});

// Block and wait for results
let result = futures_lite::future::block_on(future);

NUMA-Aware Execution

use core_executor::ThreadPoolExecutor;

// Create a NUMA-aware executor (automatically detects topology)
let mut executor = ThreadPoolExecutor::new_numa_aware()?;

// Spawn a task on a specific NUMA node
let future = executor.spawn_on_numa_node(0, async {
    // Work that benefits from NUMA locality
    compute_intensive_task()
})?;

// Check NUMA topology
if let Some(topology) = executor.numa_topology() {
    println!("System has {} NUMA nodes", topology.num_nodes());
    println!("Total cores: {}", topology.total_cores);
}

Advanced Features

Task Cancellation with Shutdown Guards

use core_executor::ThreadAffineSpawner;

// Spawn a long-running task with graceful shutdown
spawner.spawn_with_shutdown(|shutdown| async move {
    loop {
        // Do work...
        
        // Check if shutdown was requested
        if shutdown.should_exit() {
            break;
        }
        
        // Continue processing...
        smol::Timer::after(Duration::from_millis(100)).await;
    }
});

Custom Channels

Bichannel - Bidirectional channel

use core_executor::channel::Bichannel;

// Create a bounded bidirectional channel
let (left, right): (Bichannel<String, i32>, Bichannel<i32, String>) = 
    Bichannel::bounded(10);

// Send from left to right
left.send("Hello".to_string()).await?;
let msg = right.recv().await?;

// Send back from right to left
right.send(42).await?;
let reply = left.recv().await?;

Hookshot - Single-shot bidirectional channel

use core_executor::channel::Hookshot;

// Create a one-shot bidirectional channel
let (sender, receiver) = Hookshot::<String, i32>::new();

// Send a single message
sender.send_blocking("one-time message".to_string())?;
let msg = receiver.recv().await?;

// Send reply back
receiver.send_back(42).await?;
let reply = sender.recv_back().await?;

API Reference

ThreadPoolExecutor

  • new(cores: usize) - Create executor with specified number of cores
  • new_numa_aware() - Create NUMA-aware executor with auto-detected topology
  • spawn_on_core(core_id, task) - Spawn task on specific CPU core
  • spawn_on_any(task) - Spawn task on any available core (round-robin)
  • spawn_on_numa_node(node_id, task) - Spawn task on specific NUMA node
  • numa_topology() - Get NUMA topology information if available

ThreadAffineSpawner

  • spawn(task) - Spawn a task and get a future for its result
  • fire(task) - Fire-and-forget task execution
  • spawn_with_shutdown(task_fn) - Spawn with graceful shutdown support

Error Types

  • ExecutorError - Main error type for executor operations
  • BichannelError - Errors from Bichannel operations
  • HookshotError - Errors from Hookshot operations

Dependencies

  • async-executor: Local executor for each thread
  • core_affinity: CPU affinity control
  • hwlocality: NUMA topology detection and management
  • async-channel, async-oneshot: Channel primitives
  • wat-cpu: Helper for getting current CPU ID

License

This project is dual-licensed under MIT OR Apache-2.0.

About

CPU-bound async executor

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages