From cd45cbd82c51673f910ee0d7f4ff95f635974a36 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 16:24:25 +0000 Subject: [PATCH] Three major improvements: config extraction, logging, and parser refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. config.py — Centralises all hardcoded values (GSI auth token, port, timing thresholds, map codes, weapon lists) so they are easy to find and change without touching logic files. 2. Logging + exception hygiene — Replaces every print() with the stdlib logging module (INFO to console, DEBUG with -v flag). Fixes bare `except:` clauses in gsi_pinger.py and MainApp.py that were silently swallowing KeyboardInterrupt and SystemExit. Adds graceful handling for the missing pywin32 dependency on non-Windows platforms. 3. de_ancient collision fix + snapshot_arrayfier single-pass refactor — de_ancient was mapped to code 4 (same as de_nuke), causing the model to silently misclassify the map. It now gets its own code (8). The four separate player-iteration loops in snapshot_arrayfier are consolidated into one pass, halving the number of dict lookups and removing duplicated key-construction logic. https://claude.ai/code/session_01BpRrR55tsrkVo9W5rtwYMK --- MainApp.py | 269 +++++++++++++++++------------------------ config.py | 54 +++++++++ gsi_pinger.py | 70 ++++++----- snapshot_parser.py | 290 ++++++++++++++++++++++----------------------- 4 files changed, 345 insertions(+), 338 deletions(-) create mode 100644 config.py diff --git a/MainApp.py b/MainApp.py index ca5536a..24c0953 100644 --- a/MainApp.py +++ b/MainApp.py @@ -2,115 +2,114 @@ """ @author: d-roho """ -import pywintypes +import logging import sys import time + +import config from gsi_pinger import pingerfunc -from win32gui import GetWindowText, GetForegroundWindow +# --------------------------------------------------------------------------- +# Logging setup — INFO to console, DEBUG available via -v flag +# --------------------------------------------------------------------------- +_log_level = logging.DEBUG if "-v" in sys.argv else logging.INFO +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%H:%M:%S", + level=_log_level, +) +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Windows-only imports (pause-and-play feature) +# --------------------------------------------------------------------------- +try: + import pywintypes + from win32gui import GetWindowText, GetForegroundWindow + _WIN32_AVAILABLE = True +except ImportError: + _WIN32_AVAILABLE = False + logger.warning( + "pywin32 not available — pause-and-play detection disabled on this platform." + ) -def current_window(): - # Returns name of current window for listener.py - window = GetWindowText(GetForegroundWindow()) - return window +def current_window(): + if not _WIN32_AVAILABLE: + return "" + return GetWindowText(GetForegroundWindow()) def welcome_message(): - - # Display Welcome Message unless disabled by "-w" arg if "-w" not in sys.argv: - f = open('welcome.txt', 'r') - print(''.join([line for line in f])) + try: + with open("welcome.txt", "r") as f: + print("".join(f)) + except OSError: + pass print("CSGOPredictor Launched Successfully!") - if "-p" in sys.argv: - print("PAUSE-AND-PLAY = DISABLED") - else: print("PAUSE-AND-PLAY = ENABLED") + print("PAUSE-AND-PLAY =", "DISABLED" if "-p" in sys.argv else "ENABLED") if "delay" in sys.argv: delay_index = sys.argv.index("delay") + 1 delay_value = float(sys.argv[delay_index]) - print("DELAY VALUE = " + str(delay_value) + " seconds") - else: print("DELAY VALUE = 0 seconds") + print(f"DELAY VALUE = {delay_value} seconds") + else: + print("DELAY VALUE = 0 seconds") time.sleep(1.5) def change_dir(): - - """Changes working directory to match location of this python file""" import os - - # Changing working directory dir_path = os.path.dirname(os.path.realpath(__file__)) os.chdir(str(dir_path)) - print("current working directory is - " + str(dir_path)) + logger.info("Working directory: %s", dir_path) return dir_path def import_model(): - - """Imports trained Logistic Regression model""" import sklearn_json as skljson - - # Importing Model - print("Importing Model...") + logger.info("Importing model...") model = skljson.from_json("CSGOPredictor") - print("Model Imported Successfully!") + logger.info("Model imported successfully.") return model -def check_for_match_start(): +def _wait_for_snapshot(): + """Block until pingerfunc returns a non-empty dict.""" + result = {} + while not result: + result = pingerfunc() + return result - """Pauses script till useful data is being recorded' - to generate predictions""" - test = {} - print("Waiting for CS:GO to be launched...") - while len(test.keys()) == 0: - test = pingerfunc() - print("CS:GO has been launched!") - print("Waiting for Match to begin...") +def check_for_match_start(): + logger.info("Waiting for CS:GO to be launched...") + test = _wait_for_snapshot() + logger.info("CS:GO has been launched!") + logger.info("Waiting for match to begin...") while test.get("allplayers") is None: test = pingerfunc() - while test.get("map").get("phase") == 'warmup': + while test.get("map", {}).get("phase") == "warmup": test = pingerfunc() - - print("Match has Begun!") + logger.info("Match has begun!") def match_start_check_postlaunch(): - - """Same as check_for_match_start(), but used in cases where - CS:GO is known to have already been launched""" - test = {} + """Wait until a live (non-warmup, non-empty) match snapshot arrives.""" while True: try: test = pingerfunc() - if len(test.get("allplayers").keys()) == 0: - time.sleep(1) - test = pingerfunc() - else: - return False - except AttributeError: + players = test.get("allplayers") if test else None + if players and len(players) > 0: + if test.get("map", {}).get("phase") != "warmup": + return time.sleep(1) - continue - - while True: - try: - test = pingerfunc() - if test.get("map").get("phase") == 'warmup': - time.sleep(1) - test = pingerfunc() - else: - return False - except AttributeError: + except Exception: + logger.debug("match_start_check_postlaunch: transient error, retrying", exc_info=True) time.sleep(1) - continue def parse_and_predict(): - - """The main loop that parses logs and runs the predictive model. - Returns probability prediction of round outcome""" window = current_window() welcome_message() change_dir() @@ -120,147 +119,99 @@ def parse_and_predict(): from snapshot_parser import exception_handler, snapshot_formatter, snapshot_arrayfier import exceptions from listener import pause_detector, pause_screen + pause_counter = 0 - delay_req = False - if "delay" in sys.argv: - delay_req = True + delay_req = "delay" in sys.argv + delay_value = 0.0 + if delay_req: delay_index = sys.argv.index("delay") + 1 delay_value = float(sys.argv[delay_index]) while True: try: - if delay_req is True: + if delay_req: time.sleep(delay_value) - # Checking is a Pause request was initiated in previous loop + + # Check for pause request from previous loop iteration if "-p" not in sys.argv: - from listener import raise_pause_screen # imports latest value - if raise_pause_screen is True: # Checks if pause was requested + from listener import raise_pause_screen + if raise_pause_screen: pause_screen() - # Pinging for latest snapshot + # Ping for latest snapshot snapshot = None while snapshot is None: snapshot = pingerfunc() snapshot = exception_handler(snapshot) - - # formatting snapshot snapshot_formatted = snapshot_formatter(snapshot) - - # parsing snapshot to create \attributes lists for predictive model predictors = snapshot_arrayfier(snapshot_formatted) - """Prediction""" - - """Freeze Time - Making Time prediction attribute default to 115 sec - during freezetime. This makes it so that the low time_left - during freezetime doesn't skew prediction towards Ts""" + # During freezetime substitute a neutral time value so low + # time_left doesn't bias predictions toward T-side. if snapshot_formatted["phase_countdowns"].get("phase") == "freezetime": - predictors[4] = 115 + predictors[4] = config.FREEZETIME_DEFAULT_SECONDS - # Running model with predictors pred_nested = model.predict_proba([predictors]) - pred = pred_nested[0] # converts list from nested to unnested - pred = list(pred) # converts numpy array to list - for i in range(2): # decimal -> %, and round values - pred[i] = round(pred[i]*100, 2) - - """Default Predictions - These are scenarios in which the winner of - the round has been decided (or is a virtual certainty) which the - predictive model is not able to account for when making prediction - """ - # Round Over - if snapshot_formatted["round"]["phase"] == "over": - if snapshot_formatted["round"].get("win_team") == "T": - pred = [0, 100] - if snapshot_formatted["round"].get("win_team") == "CT": - pred = [100, 0] - - """Virtual Round Win - Scenarios in which a team cannot lose, - but the round is still live""" + pred = [round(p * 100, 2) for p in pred_nested[0]] - # Bomb Timer < 5 seconds - if snapshot_formatted["phase_countdowns"].get("phase") == "bomb": - if float(snapshot_formatted["phase_countdowns"].get("phase_ends_in")) < 5.0: + # Override with deterministic outcomes when the round winner is known. + round_data = snapshot_formatted.get("round", {}) + if round_data.get("phase") == "over": + if round_data.get("win_team") == "T": pred = [0, 100] + elif round_data.get("win_team") == "CT": + pred = [100, 0] - """Time to Defuse > Time left in Round - cant do this with - existing info, solution may be possible (more info from GSI?)""" - - # Bomb Planted, All Ts dead - enough time to defuse - same as above - - print(pred) - with open('predictions.txt', 'a') as fh: # writes predictions to txt file - fh.write(str(pred)+'\n') - - # Check for Pause request (every 10 loops unless delay is specified) - if "-p" not in sys.argv: - if GetWindowText(GetForegroundWindow()) == window: - if delay_req is True: # when delay, check for pause after each loop - pause_detector() - elif pause_counter % 10 == 0: + # Bomb about to explode — T-side virtual win. + countdown = snapshot_formatted.get("phase_countdowns", {}) + if countdown.get("phase") == "bomb": + try: + if float(countdown.get("phase_ends_in", 999)) < config.BOMB_NEAR_EXPIRY_SECONDS: + pred = [0, 100] + except (TypeError, ValueError): + pass + + logger.info("Prediction: CT=%.2f%% T=%.2f%%", pred[0], pred[1]) + with open("predictions.txt", "a") as fh: + fh.write(str(pred) + "\n") + + # Poll for pause request (every 10 loops, or every loop when delayed) + if "-p" not in sys.argv and _WIN32_AVAILABLE: + if current_window() == window: + if delay_req or pause_counter % 10 == 0: pause_detector() - pause_counter += 1 + pause_counter += 1 - # Exceptions except exceptions.EmptyServer: - - """Raised by program when no players are found in the server. - Forces program to wait till at least one player is detected""" - print("Server is empty. Program will automatically resume once at least one player joins the server.") + logger.warning("Server is empty — waiting for a player to join...") time.sleep(1) - print("Waiting...") match_start_check_postlaunch() - print("Player(s) Detected!") + logger.info("Player(s) detected!") time.sleep(1) - continue except exceptions.MatchNotStarted: - - """Raised by program when player is not spectating a match. - Usually occurs when user goes into, then exits a match""" - print("You are not currently spectating a Match. Program will automatically resume when you begin spectating.") + logger.warning("Not spectating a match — waiting...") time.sleep(1) - print("Waiting...") match_start_check_postlaunch() - print("Match has Begun!") + logger.info("Match has begun!") time.sleep(1) - continue except exceptions.WarmUp: - - # Raised by program when match being spectated is in warm up mode - print("Match is in Warm Up Phase. Predictions will begin after Warm Up.") + logger.info("Match is in warm-up — predictions will begin after warm-up.") time.sleep(1) - print("Waiting...") - check_for_match_start() # For unknown reason, match_start_check_postlaunch() doesnt work here + check_for_match_start() time.sleep(1) - continue except KeyError: - - """A catch-all exception which restarts the loop. Should not occur. - Please report on GitHub if found.""" - print("KeyError. Restarting loop.") - print("This should not occur. Please raise a Ticket on GitHub!") + logger.warning("KeyError in main loop — restarting. Please report on GitHub if frequent.") time.sleep(5) - continue except KeyboardInterrupt: - - """Catches Command Terminal keyboard interrupts. Helps exit program smoothly. - Without this, program raises several errors.""" - print("Exiting Program") + logger.info("Keyboard interrupt received — exiting.") time.sleep(0.5) - try: - sys.exit() - except: - sys.exit() - - print("While loop in parse_and_predict somehow broken. This should not occur, please report on GitHub") - time.sleep(5) - sys.exit() + sys.exit(0) -if __name__ == '__main__': +if __name__ == "__main__": parse_and_predict() diff --git a/config.py b/config.py new file mode 100644 index 0000000..733164b --- /dev/null +++ b/config.py @@ -0,0 +1,54 @@ +"""Central configuration for CSGOPredictor. Edit these values to customize behavior.""" + +# GSI server settings +GSI_HOST = "localhost" +GSI_PORT = 3000 +GSI_AUTH_TOKEN = "odM6BOq8stAsOpRJK4hb" + +# Prediction thresholds +FREEZETIME_DEFAULT_SECONDS = 115 # time_left substituted during freeze so it doesn't skew T-side +BOMB_NEAR_EXPIRY_SECONDS = 5.0 # below this, treat bomb as virtual T win + +# Map encoding used during model training. +# de_ancient (added to pool 2021) was not in the training set; it gets its own +# code (8) so it no longer collides with de_nuke (4). Predictions on de_ancient +# will be less reliable than on maps the model was trained on. +MAP_CODES = { + "de_cache": 0, + "de_dust2": 1, + "de_inferno": 2, + "de_mirage": 3, + "de_nuke": 4, + "de_overpass": 5, + "de_train": 6, + "de_vertigo": 7, + "de_ancient": 8, +} +DEFAULT_MAP_CODE = 1 # falls back to de_dust2 for unknown maps + +# Weapon classification lists (must match training data) +PISTOLS_SPECIAL = frozenset([ + "weapon_cz75auto", "weapon_elite", "weapon_r8revolver", + "weapon_deagle", "weapon_fiveseven", "weapon_p250", "weapon_tec9", +]) +PISTOLS_STANDARD = frozenset([ + "weapon_usps", "weapon_glock", "weapon_hkp2000", +]) +PRIMARIES_FORCE = frozenset([ + "weapon_bizon", "weapon_famas", "weapon_galilar", "weapon_mac10", + "weapon_mag7", "weapon_mp5sd", "weapon_mp7", "weapon_mp9", + "weapon_negev", "weapon_nova", "weapon_p90", "weapon_sawedoff", + "weapon_ssg08", "weapon_ump45", "weapon_xm1014", +]) +PRIMARIES_FULLBUY = frozenset([ + "weapon_ak47", "weapon_aug", "weapon_awp", "weapon_g3sg1", + "weapon_m249", "weapon_m4a1s", "weapon_m4a1", "weapon_m4a1_silencer", + "weapon_m4a4", "weapon_scar20", "weapon_sg556", +]) +GRENADES = frozenset([ + "weapon_hegrenade", "weapon_frag_grenade", "weapon_flashbang", + "weapon_smokegrenade", "weapon_decoy", "weapon_molotov", "weapon_incgrenade", +]) +WEAPONS_IGNORE = frozenset([ + "weapon_knife", "weapon_knife_t", "weapon_c4", +]) diff --git a/gsi_pinger.py b/gsi_pinger.py index c630c27..2a90a11 100644 --- a/gsi_pinger.py +++ b/gsi_pinger.py @@ -2,52 +2,66 @@ snapshot of the round being spectated, for use by the predictive model. This is done through the use of its Gamestate Intergration functionality.""" +import json +import logging +from http.server import BaseHTTPRequestHandler, HTTPServer + +import config + +logger = logging.getLogger(__name__) + def pingerfunc(): - """The primary loop that opens a server, pings CSGO for a single snapshot, - closes the server and makes snapshot data available to MainApp.""" - from http.server import BaseHTTPRequestHandler, HTTPServer - import json - global snapshot - snapshot = None + """Opens a temporary HTTP server, receives one GSI snapshot from CS:GO, + closes the server, and returns the snapshot dict (or None on failure).""" + + snapshot_holder = [None] # mutable container so the inner class can write to it class GSIServer(HTTPServer): def __init__(self, server_address, token, RequestHandler): self.auth_token = token - super(GSIServer, self).__init__(server_address, RequestHandler) class RequestHandler(BaseHTTPRequestHandler): def do_POST(self): - global snapshot - length = int(self.headers['Content-Length']) - body = self.rfile.read(length).decode('utf-8') + length = int(self.headers["Content-Length"]) + body = self.rfile.read(length).decode("utf-8") payload = json.loads(body) - # Ignore unauthenticated payloads - if not self.authenticate_payload(payload): - return None - - snapshot = payload - exit() + if not self._authenticate(payload): + logger.debug("Rejected unauthenticated GSI payload") + return - self.send_header('Content-type', 'text/html') - self.send_response(200) - self.end_headers() + snapshot_holder[0] = payload + # Raise to unblock serve_forever(); caught below. + raise SystemExit - def authenticate_payload(self, payload): + # (unreachable — response headers intentionally omitted because + # CS:GO does not require a well-formed HTTP response here) - # Checks payload auth token against the token specified below - if 'auth' in payload and 'token' in payload['auth']: - return payload['auth']['token'] == server.auth_token - else: - return False + def _authenticate(self, payload): + return ( + "auth" in payload + and "token" in payload["auth"] + and payload["auth"]["token"] == self.server.auth_token + ) - server = GSIServer(('localhost', 3000), 'odM6BOq8stAsOpRJK4hb', RequestHandler) + def log_message(self, format, *args): # suppress per-request stderr noise + logger.debug("GSI request: %s", format % args) + server = GSIServer( + (config.GSI_HOST, config.GSI_PORT), + config.GSI_AUTH_TOKEN, + RequestHandler, + ) try: server.serve_forever() - except: + except SystemExit: + pass + except Exception: + logger.exception("Unexpected error in GSI server") + finally: server.server_close() - return snapshot + + return snapshot_holder[0] diff --git a/snapshot_parser.py b/snapshot_parser.py index e13aad0..34d7fd7 100644 --- a/snapshot_parser.py +++ b/snapshot_parser.py @@ -1,182 +1,170 @@ """Imports""" +import logging + +import config import exceptions +logger = logging.getLogger(__name__) -def exception_handler(snapshot): - # Checks to see if snapshot is valid. See exceptions.py for details +def exception_handler(snapshot): if snapshot.get("allplayers") is None: raise exceptions.MatchNotStarted - if snapshot.get("map").get("phase") == 'warmup': + if snapshot.get("map", {}).get("phase") == "warmup": raise exceptions.WarmUp - if len(snapshot["allplayers"].keys()) == 0: + if len(snapshot["allplayers"]) == 0: raise exceptions.EmptyServer return snapshot def snapshot_formatter(ssoriginal): - - # Converts Player names to Generic names for easy dict access - ss1 = ssoriginal - playernames = list(ss1["allplayers"].keys()) - ss1_ap = ss1["allplayers"] - for i in range(len(playernames)): - ss1_ap[str("player" + str(i+1))] = ss1_ap.pop(str(playernames[i])) - return ss1 + """Rename player keys to player1…playerN for deterministic dict access.""" + ss = ssoriginal + player_names = list(ss["allplayers"].keys()) + for i, name in enumerate(player_names): + ss["allplayers"][f"player{i + 1}"] = ss["allplayers"].pop(name) + return ss def snapshot_arrayfier(snapshot_formatted): - - """Creating list of attributes for prediction - - # # Order of attributes - - # # 'map', 'bomb_planted','ct_score', 't_score', 'time_left', - # # 'ct_players_alive','t_players_alive', 'ct_health', 't_health', - # # 'ct_armor', 't_armor','ct_pistols_special', 't_pistols_special', - # # 't_pistols_standard', 'ct_pistols_standard', 'ct_primaries_force', - # # 't_primaries_force', 'ct_primaries_fullbuy', 't_primaries_fullbuy', - # # 'ct_grenades', 't_grenades', 'ct_helmets', 't_helmets', 'ct_defuse_kits' - - # # Encoded Lables:Original Values - # # round_winner = {'CT': 0, 'T': 1} - # # map = {'de_cache': 0, 'de_dust2': 1, 'de_inferno': 2, 'de_mirage': 3, - # 'de_nuke': 4, 'de_overpass': 5, 'de_train': 6, 'de_vertigo': 7} - # # bomb_planted = {False: 0, True: 1} + """Build the 24-element feature vector expected by the trained model. + + Attribute order: + 0 map + 1 bomb_planted + 2 ct_score + 3 t_score + 4 time_left + 5 ct_players_alive + 6 t_players_alive + 7 ct_health + 8 t_health + 9 ct_armor + 10 t_armor + 11 ct_pistols_special + 12 t_pistols_special + 13 t_pistols_standard + 14 ct_pistols_standard + 15 ct_primaries_force + 16 t_primaries_force + 17 ct_primaries_fullbuy + 18 t_primaries_fullbuy + 19 ct_grenades + 20 t_grenades + 21 ct_helmets + 22 t_helmets + 23 ct_defuse_kits (not transmitted by GSI — always 0) """ - snap = snapshot_formatted - predictors = [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - ] - - # adding var0 - map - map_codes = { - 'de_cache': 0, 'de_dust2': 1, 'de_inferno': 2, 'de_mirage': 3, - 'de_nuke': 4, 'de_overpass': 5, 'de_train': 6, 'de_vertigo': 7, 'de_ancient': 4 - } - map_string = str(snap["map"]["name"]) - if map_codes.get(map_string) is None: - predictors[0] = 1 # Default Map set to de_dust2 (most balanced map) - else: - predictors[0] = map_codes.get(map_string) - - # adding var1 - bomb_planted - if "bomb" in snap["round"].keys(): + predictors = [0] * 24 + + # --- var 0: map --- + map_string = snap["map"]["name"] + predictors[0] = config.MAP_CODES.get(map_string, config.DEFAULT_MAP_CODE) + if map_string not in config.MAP_CODES: + logger.debug("Unknown map '%s' — defaulting to code %d", map_string, config.DEFAULT_MAP_CODE) + + # --- var 1: bomb planted --- + if "bomb" in snap.get("round", {}).keys(): predictors[1] = 1 - # adding var2,3 - scores + # --- vars 2, 3: scores --- predictors[2] = snap["map"]["team_ct"]["score"] predictors[3] = snap["map"]["team_t"]["score"] - # adding var4 - time_left - predictors[4] = int(float(snap["phase_countdowns"]["phase_ends_in"])) - - # adding var 5,6,7,8 - players alive and team health - counter_t = 0 - counter_ct = 0 - health_t = 0 - health_ct = 0 - for i in range(len(snap["allplayers"].keys())): - if snap["allplayers"][str("player" + str(i + 1))]["state"]["health"] > 0: - if snap["allplayers"][str("player" + str(i + 1))]["team"] == "T": - counter_t += 1 - health_t += snap["allplayers"][str("player" + str(i + 1))]["state"]["health"] - if snap["allplayers"][str("player" + str(i + 1))]["team"] == "CT": - counter_ct += 1 - health_ct += snap["allplayers"][str("player" + str(i + 1))]["state"]["health"] - - predictors[5] = counter_ct - predictors[6] = counter_t - predictors[7] = health_ct - predictors[8] = health_t - - # adding var 9,10 - armor - armor_t = 0 - armor_ct = 0 - - for i in range(len(snap["allplayers"].keys())): - if snap["allplayers"][str("player" + str(i + 1))]["state"]["armor"] > 0: - if snap["allplayers"][str("player" + str(i + 1))]["team"] == "T": - armor_t += snap["allplayers"][str("player" + str(i + 1))]["state"]["armor"] - if snap["allplayers"][str("player" + str(i + 1))]["team"] == "CT": - armor_ct += snap["allplayers"][str("player" + str(i + 1))]["state"]["armor"] - - predictors[9] = armor_ct - predictors[10] = armor_t - - # adding vars 11 to 20 - weapons and grenades - ct_pistols_special = 0; t_pistols_special = 0 - pistols_special = ["weapon_cz75auto", "weapon_elite", 'weapon_r8revolver', 'weapon_deagle', 'weapon_fiveseven', - 'weapon_p250', 'weapon_tec9'] - t_pistols_standard = 0; ct_pistols_standard = 0 - pistols_standard = ['weapon_usps', 'weapon_glock', 'weapon_hkp2000'] - ct_primaries_force = 0; t_primaries_force = 0 - primaries_force = ['weapon_bizon', 'weapon_famas', 'weapon_galilar', 'weapon_mac10', 'weapon_mag7', 'weapon_mp5sd', - 'weapon_mp7', 'weapon_mp9', 'weapon_negev', 'weapon_nova', 'weapon_p90', 'weapon_sawedoff', - 'weapon_ssg08', 'weapon_ump45', 'weapon_xm1014', ] - ct_primaries_fullbuy = 0; t_primaries_fullbuy = 0 - primaries_fullbuy = ['weapon_ak47', 'weapon_aug', 'weapon_awp', 'weapon_g3sg1', 'weapon_m249', 'weapon_m4a1s', "weapon_m4a1", - "weapon_m4a1_silencer", 'weapon_m4a4', 'weapon_scar20', 'weapon_sg556'] - ct_grenades = 0; t_grenades = 0 - grenades = ["weapon_hegrenade", "weapon_frag_grenade", "weapon_flashbang", "weapon_smokegrenade", "weapon_decoy", - "weapon_molotov", "weapon_incgrenade", ] - ignore = ["weapon_knife", "weapon_knife_t", "weapon_c4"] - - for i in range(len(snap["allplayers"].keys())): - player = str("player" + str(i + 1)) - if snap["allplayers"][player]["team"] == "T": - for iterator in range(len(list(snap["allplayers"][player]["weapons"].keys()))): - weapon = str("weapon_" + str(iterator)) - if snap["allplayers"][player]["weapons"][weapon]["name"] in ignore: - continue - elif snap["allplayers"][player]["weapons"][weapon]["name"] in pistols_special: - t_pistols_special += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in pistols_standard: - t_pistols_standard += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in primaries_force: - t_primaries_force += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in primaries_fullbuy: - t_primaries_fullbuy += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in grenades: - t_grenades += 1 - - elif snap["allplayers"][player]["team"] == "CT": - for iterator in range(len(list(snap["allplayers"][player]["weapons"].keys()))): - weapon = str("weapon_" + str(iterator)) - if snap["allplayers"][player]["weapons"][weapon]["name"] in ignore: - continue - elif snap["allplayers"][player]["weapons"][weapon]["name"] in pistols_special: + # --- var 4: time left --- + try: + predictors[4] = int(float(snap["phase_countdowns"]["phase_ends_in"])) + except (KeyError, TypeError, ValueError): + predictors[4] = 0 + + # --- vars 5-22: single pass over all players --- + ct_alive = ct_health = ct_armor = ct_helmets = 0 + t_alive = t_health = t_armor = t_helmets = 0 + ct_pistols_special = ct_pistols_standard = 0 + ct_primaries_force = ct_primaries_fullbuy = ct_grenades = 0 + t_pistols_special = t_pistols_standard = 0 + t_primaries_force = t_primaries_fullbuy = t_grenades = 0 + + players = snap["allplayers"] + for i in range(len(players)): + player = players[f"player{i + 1}"] + state = player.get("state", {}) + team = player.get("team", "") + health = state.get("health", 0) + is_ct = team == "CT" + is_t = team == "T" + + if health > 0: + if is_ct: + ct_alive += 1 + ct_health += health + elif is_t: + t_alive += 1 + t_health += health + + armor = state.get("armor", 0) + if armor > 0: + if is_ct: + ct_armor += armor + elif is_t: + t_armor += armor + + if state.get("helmet") is True: + if is_ct: + ct_helmets += 1 + elif is_t: + t_helmets += 1 + + weapons = player.get("weapons", {}) + for slot_idx in range(len(weapons)): + weapon_name = weapons.get(f"weapon_{slot_idx}", {}).get("name", "") + if weapon_name in config.WEAPONS_IGNORE: + continue + if weapon_name in config.PISTOLS_SPECIAL: + if is_ct: ct_pistols_special += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in pistols_standard: + elif is_t: + t_pistols_special += 1 + elif weapon_name in config.PISTOLS_STANDARD: + if is_ct: ct_pistols_standard += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in primaries_force: + elif is_t: + t_pistols_standard += 1 + elif weapon_name in config.PRIMARIES_FORCE: + if is_ct: ct_primaries_force += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in primaries_fullbuy: + elif is_t: + t_primaries_force += 1 + elif weapon_name in config.PRIMARIES_FULLBUY: + if is_ct: ct_primaries_fullbuy += 1 - elif snap["allplayers"][player]["weapons"][weapon]["name"] in grenades: + elif is_t: + t_primaries_fullbuy += 1 + elif weapon_name in config.GRENADES: + if is_ct: ct_grenades += 1 + elif is_t: + t_grenades += 1 - weapons_count = [ct_pistols_special, t_pistols_special, t_pistols_standard, ct_pistols_standard, - ct_primaries_force, t_primaries_force, ct_primaries_fullbuy, t_primaries_fullbuy, ct_grenades, t_grenades] - predictors[11:21] = weapons_count[0:10] - - # adding vars 21,22 - helmets - helmets_t = 0 - helmets_ct = 0 - - for i in range(len(snap["allplayers"].keys())): - player = str("player" + str(i + 1)) - if snap["allplayers"][player]["state"]["helmet"] is True: - if snap["allplayers"][player]["team"] == "T": - helmets_t += 1 - if snap["allplayers"][player]["team"] == "CT": - helmets_ct += 1 - - predictors[21] = helmets_ct - predictors[22] = helmets_t - - # adding var23 - defuse kits - # # data not transmitted, check it out later + predictors[5] = ct_alive + predictors[6] = t_alive + predictors[7] = ct_health + predictors[8] = t_health + predictors[9] = ct_armor + predictors[10] = t_armor + predictors[11] = ct_pistols_special + predictors[12] = t_pistols_special + predictors[13] = t_pistols_standard + predictors[14] = ct_pistols_standard + predictors[15] = ct_primaries_force + predictors[16] = t_primaries_force + predictors[17] = ct_primaries_fullbuy + predictors[18] = t_primaries_fullbuy + predictors[19] = ct_grenades + predictors[20] = t_grenades + predictors[21] = ct_helmets + predictors[22] = t_helmets + # predictors[23] = ct_defuse_kits (not transmitted via GSI) return predictors