Skip to content
Draft
103 changes: 62 additions & 41 deletions bin/wfbench
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2025 The WfCommons Team.
Expand All @@ -20,13 +20,17 @@ import re
import json
import logging
import pandas as pd
import psutil

from io import StringIO
from filelock import FileLock
from pathos.helpers import mp as multiprocessing
from typing import List, Optional


int32_max = 2147483647


# Configure logging
logging.basicConfig(
level=logging.INFO, # Change this to control the verbosity
Expand All @@ -39,6 +43,20 @@ logging.basicConfig(
this_dir = pathlib.Path(__file__).resolve().parent


def kill_process_and_children(proc):
if proc is None:
return
try:
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
parent.kill()

except psutil.NoSuchProcess:
pass # Process is already dead


def log_info(msg: str):
"""
Log an info message to stderr
Expand Down Expand Up @@ -165,34 +183,39 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
:rtype: List
"""
total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%"
cpu_work_per_thread = int(cpu_work / cpu_threads)

cpu_procs = []
mem_procs = []
cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"]
cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2
cpu_samples = min(cpu_work_per_thread, int32_max)
cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max
if cpu_ops > int32_max:
log_info("Exceeded maximum allowed value of cpu work.")
cpu_ops = int32_max

cpu_proc = None
mem_proc = None

cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}",
"--monte-carlo-method", "pi",
"--monte-carlo-rand", "lcg",
"--monte-carlo-samples", f"{cpu_samples}",
"--monte-carlo-ops", f"{cpu_ops}",
"--quiet"]
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
"--vm-bytes", f"{total_mem}", "--vm-keep"]
"--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"]

for i in range(cpu_threads):
cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if cpu_threads > 0:
cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid)

# NOTE: might be a good idea to use psutil to set the affinity (works across platforms)
if core:
os.sched_setaffinity(cpu_proc.pid, {core})
cpu_procs.append(cpu_proc)

# Start a thread to monitor the progress of each CPU benchmark process
monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue))
monitor_thread.start()

if mem_threads > 0:
# NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
if core:
os.sched_setaffinity(mem_proc.pid, {core})
mem_procs.append(mem_proc)

return cpu_procs, mem_procs
return [cpu_proc, mem_proc]


def io_read_benchmark_user_input_data_size(inputs,
Expand Down Expand Up @@ -240,7 +263,7 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit
for name in outputs:
open(rundir.joinpath(name), "wb").close()

io_completed = 0
io_completed = 1
bytes_read = {
name: 0
for name in inputs
Expand All @@ -257,21 +280,21 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit
}

while io_completed < 100:
cpu_percent = max(io_completed, cpu_queue.get())
while True: # Get the last message
try:
cpu_percent = max(io_completed, cpu_queue.get_nowait())
except queue.Empty:
break

log_debug(f"CPU Percent: {cpu_percent}")
if cpu_percent:
#cpu_percent = max(io_completed, cpu_queue.get())
#while True: # Get the last message
# try:
# cpu_percent = max(io_completed, cpu_queue.get_nowait())
# except queue.Empty:
# break

log_debug(f"IO Percent: {io_completed}")
if True: #cpu_percent:
bytes_to_read = {
name: int(size * (cpu_percent / 100) - bytes_read[name])
name: int(size * (io_completed / 100) - bytes_read[name])
for name, size in inputs.items()
}
bytes_to_write = {
name: int(size * (cpu_percent / 100) - bytes_written[name])
name: int(size * (io_completed / 100) - bytes_written[name])
for name, size in outputs.items()
}
io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit)
Expand All @@ -289,7 +312,7 @@ def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit
log_debug(f"Bytes Read: {bytes_read}")
log_debug(f"Bytes Written: {bytes_written}")

io_completed = cpu_percent
io_completed = io_completed + 1

if io_completed >= 100:
break
Expand Down Expand Up @@ -374,7 +397,7 @@ def main():

log_info(f"Starting {args.name} Benchmark")

mem_bytes = args.mem * 1024 * 1024 if args.mem else None
mem_bytes = args.mem * 1000 * 1000 if args.mem else None

procs = []
io_proc = None
Expand Down Expand Up @@ -446,35 +469,33 @@ def main():
log_debug(f"{args.name} acquired core {core}")

mem_threads=int(10 - 10 * args.percent_cpu)
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue,
[cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue,
cpu_threads=int(10 * args.percent_cpu),
mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
cpu_work=int32_max**2 if args.time else int(args.cpu_work),
core=core,
total_mem=mem_bytes)

procs.extend(cpu_procs)
procs.append(cpu_proc)
if args.time:
time.sleep(int(args.time))
for proc in procs:
if isinstance(proc, multiprocessing.Process):
if proc.is_alive():
proc.terminate()
elif isinstance(proc, subprocess.Popen):
proc.terminate()
kill_process_and_children(proc)
else:
for proc in procs:
if isinstance(proc, subprocess.Popen):
proc.wait()
if io_proc is not None and io_proc.is_alive():
# io_proc.terminate()
#io_proc.terminate()
io_proc.join()

for mem_proc in mem_procs:
try:
os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
try:
kill_process_and_children(mem_proc)
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
# As a fallback, use pkill if any remaining instances are stuck
subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ dependencies = [
"networkx",
"numpy",
"pandas",
"psutil",
"python-dateutil",
"requests",
"scipy>=1.16.1",
"pyyaml",
"pandas",
"shortuuid",
"stringcase",
"filelock",
Expand Down
5 changes: 3 additions & 2 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=
working_dir=working_dir,
user="wfcommons",
tty=True,
detach=True
detach=True,
init=True # For zombies
)

# Installing WfCommons on container
Expand Down Expand Up @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow):
# sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n")
workflow2_output_bytes += output_file.size
assert (workflow1_input_bytes == workflow2_input_bytes)
assert (workflow1_output_bytes == workflow2_output_bytes)
assert (workflow1_output_bytes == workflow2_output_bytes)
Loading