diff --git a/oap-http/oap-pnio-v3/src/main/java/oap/http/pniov3/PnioController.java b/oap-http/oap-pnio-v3/src/main/java/oap/http/pniov3/PnioController.java index a0e4a1bd8..b6d2dd038 100644 --- a/oap-http/oap-pnio-v3/src/main/java/oap/http/pniov3/PnioController.java +++ b/oap-http/oap-pnio-v3/src/main/java/oap/http/pniov3/PnioController.java @@ -45,19 +45,26 @@ public class PnioController implements AutoCloseable { } } + public final int maxThreads; final ForkJoinPool forkJoinPool; - private final int maxThreads; public volatile boolean done; + public PnioController( int parallelism, double maxThreadsMultiplicator ) { + this( getForkJoinPoolParallelism( parallelism ), ( int ) ( getForkJoinPoolParallelism( parallelism ) * maxThreadsMultiplicator ) ); + } + public PnioController( int parallelism, int maxThreads ) { - forkJoinPool = new ForkJoinPool( parallelism, new ForkJoinPool.ForkJoinWorkerThreadFactory() { + int forkJoinPoolParallelism = getForkJoinPoolParallelism( parallelism ); + + // ForkJoinPool#MAX_CAP = 0x7fff + forkJoinPool = new ForkJoinPool( forkJoinPoolParallelism, new ForkJoinPool.ForkJoinWorkerThreadFactory() { @SneakyThrows @Override public ForkJoinWorkerThread newThread( ForkJoinPool pool ) { return ( ForkJoinWorkerThread ) carrierThread.invokeWithArguments( pool ); } }, ( t, e ) -> log.error( e.getMessage(), e ), - true, parallelism, parallelism, parallelism, null, 60_000L, TimeUnit.MILLISECONDS ); + true, 0, 0x7fff, forkJoinPoolParallelism, null, 60_000L, TimeUnit.MILLISECONDS ); this.maxThreads = maxThreads; Metrics.gauge( "pnio_controller", Tags.of( "type", "QueuedTask" ), this, _ -> forkJoinPool.getQueuedTaskCount() ); @@ -68,6 +75,16 @@ public ForkJoinWorkerThread newThread( ForkJoinPool pool ) { Metrics.gauge( "pnio_controller", Tags.of( "type", "RequestQueue" ), this, _ -> threadCounter.get() ); } + private static int getForkJoinPoolParallelism( int parallelism ) { + int forkJoinPoolParallelism; + if( parallelism > 0 ) { + forkJoinPoolParallelism = parallelism; + } else { + forkJoinPoolParallelism = Runtime.getRuntime().availableProcessors() - parallelism; + } + return forkJoinPoolParallelism; + } + @Override public void close() { done = true; diff --git a/pom.xml b/pom.xml index 11845909a..8a804ac5b 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.4.6 + 25.4.7 25.0.1 25.0.0