diff --git a/py-scripts/real_application_tests/zoom_automation/android_zoom.py b/py-scripts/real_application_tests/zoom_automation/android_zoom.py new file mode 100644 index 000000000..b6303d398 --- /dev/null +++ b/py-scripts/real_application_tests/zoom_automation/android_zoom.py @@ -0,0 +1,565 @@ +#!/usr/bin/env python3 +from datetime import datetime, timedelta +import uiautomator2 as u2 +import time +import argparse +import re +import xml.etree.ElementTree as ET +from ppadb.client import Client as AdbClient +import requests +import pytz +import sys +import logging +import os + +# from ping_monitor import PingMonitor + + +class ZoomAutomator: + def __init__( + self, + host="127.0.0.1", + port=5037, + server_ip="127.0.0.1", + server_port=5000, + participant_name=None, + ): + self.host = host + self.port = port + self.client = AdbClient(host=host, port=port) + self.device_serial = None + self.u2_device = None + self.base_url = "http://{server_ip}:{server_port}".format( + server_ip=server_ip, server_port=server_port + ) + self.start_time = None + self.end_time = None + self.adb_device = None + self.stop_signal = False + self.tz = pytz.timezone("Asia/Kolkata") + self.participant_name = participant_name or "android_zoom" + self.logger = self._create_logger() + # self.ping_monitor = PingMonitor(self.participant_name) + + def _create_logger(self): + log_dir = os.path.join(os.getcwd(), "zoom_mobile_logs") + os.makedirs(log_dir, exist_ok=True) + + logger_name = f"{__name__}.{self.participant_name}" + logger = logging.getLogger(logger_name) + logger.setLevel(logging.INFO) + logger.propagate = False + + if not logger.handlers: + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + + file_handler = logging.FileHandler( + os.path.join(log_dir, f"{self.participant_name}.log"), mode="w" + ) + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + + @staticmethod + def _parse_bounds(bounds): + match = re.match(r"\[(\d+),(\d+)\]\[(\d+),(\d+)\]", bounds or "") + if not match: + return None + return tuple(map(int, match.groups())) + + def tap_bounds_center(self, d, bounds): + parsed = self._parse_bounds(bounds) + if not parsed: + return False + + left, top, right, bottom = parsed + d.click((left + right) // 2, (top + bottom) // 2) + return True + + def reveal_zoom_controls(self, d, tap_coords): + audio_state, _, _ = self.get_audio_control_info(d) + video_state, _, _ = self.get_video_control_info(d) + if audio_state is not None or video_state is not None: + return + + d.click(*tap_coords) + time.sleep(0.8) + + audio_state, _, _ = self.get_audio_control_info(d) + video_state, _, _ = self.get_video_control_info(d) + if audio_state is not None or video_state is not None: + return + + d.click(*tap_coords) + time.sleep(1) + + def get_audio_control_info(self, d): + """Return audio state and bounds by parsing the current hierarchy dump.""" + try: + root = ET.fromstring(d.dump_hierarchy()) + except Exception as e: + self.logger.error( + f"[{self.device_serial}] Failed to parse audio hierarchy: {e}" + ) + return None, None, None + + for node in root.iter("node"): + content_desc = node.attrib.get("content-desc", "") + if content_desc == "Mute my audio, button": + return True, node.attrib.get("bounds"), content_desc + if content_desc == "Unmute my audio, button": + return False, node.attrib.get("bounds"), content_desc + + return None, None, None + + def get_video_control_info(self, d): + """Return Video state and bounds by parsing the current hierarchy dump.""" + try: + root = ET.fromstring(d.dump_hierarchy()) + except Exception as e: + self.logger.error( + f"[{self.device_serial}] Failed to parse video hierarchy: {e}" + ) + return None, None, None + + for node in root.iter("node"): + content_desc = node.attrib.get("content-desc", "") + if content_desc == "Start my video, button": + return False, node.attrib.get("bounds"), content_desc + if content_desc == "Stop my video, button": + return True, node.attrib.get("bounds"), content_desc + + return None, None, None + + def get_leave_control_info(self, d): + """Return leave button bounds by parsing the current hierarchy dump.""" + try: + root = ET.fromstring(d.dump_hierarchy()) + except Exception as e: + self.logger.error( + f"[{self.device_serial}] Failed to parse leave hierarchy: {e}" + ) + return None, None + + for node in root.iter("node"): + content_desc = node.attrib.get("content-desc", "") + if content_desc == "Leave, button": + return node.attrib.get("bounds"), content_desc + + return None, None + + def set_device(self, serial): + """Set the target device for automation using its ADB serial number.""" + self.device_serial = serial + try: + # Get the device object via ADB client + self.adb_device = self.client.device(serial) + if self.adb_device is None: + raise Exception(f"Device with serial {serial} not found via ADB.") + + # Connect using uiautomator2 for UI interaction + self.u2_device = u2.connect(serial) + self.logger.info(f"[{serial}] Successfully connected to device.") + + except Exception as e: + self.logger.error(f"[{serial}] Failed to connect: {e}") + raise + + def start_interop_app(self): + if not self.adb_device: + raise RuntimeError("Device not set. Call set_device() first.") + self.logger.info(f"[{self.device_serial}] Launching Interop App...") + self.adb_device.shell("am force-stop us.zoom.videomeetings") + time.sleep(1) + self.adb_device.shell("am force-stop com.candela.wecan") + time.sleep(1) + self.adb_device.shell( + "am start --es auto_start 1 -n com.candela.wecan/com.candela.wecan.StartupActivity" + ) + time.sleep(5) + self.logger.info(f"[{self.device_serial}] Interop App launched successfully.") + + def check_stop_signal(self): + """Check the stop signal from the Flask server.""" + try: + endpoint_url = f"{self.base_url}/check_stop" + + response = requests.get(endpoint_url, timeout=10) + if response.status_code == 200: + + stop_signal_from_server = response.json().get("stop", False) + + # Only update if the server's stop signal is True + if stop_signal_from_server: + self.stop_signal = True + self.logger.info( + "Stop signal received from the server. Exiting the loop." + ) + else: + self.logger.info( + "No stop signal received from the server. Continuing." + ) + return self.stop_signal + except Exception as e: + self.logger.error(f"Error checking stop signal: {e}") + return self.stop_signal + + def join_zoom_meeting(self, meeting_url, participant_name): + if not self.u2_device: + raise RuntimeError("Device not set. Call set_device() first.") + + serial = self.device_serial + d = self.u2_device + try: + width, height = d.window_size() + except Exception: + # fallback defaults as it throws error in some devices + width, height = 500, 1000 + + self.logger.info(f"[{serial}] Starting Zoom automation for: {participant_name}") + + # 1. Launch Zoom using the meeting link + self.logger.info(f"[{serial}] Launching Zoom app with meeting link...") + d.app_start("us.zoom.videomeetings", stop=True) + time.sleep(2) + + self.adb_device.shell( + f'am start -a android.intent.action.VIEW -d "{meeting_url}"' + ) + time.sleep(8) + + # 2. Handle permission prompts first + self.logger.info(f"[{serial}] Checking for permission prompts...") + allow_while_using = d(text="While using the app") + if allow_while_using.wait(timeout=8): + allow_while_using.click() + self.logger.info(f"[{serial}] Granted 'While using the app' permission") + time.sleep(2) + + for permission_text in ["Allow", "ALLOW"]: + allow_btn = d(text=permission_text, className="android.widget.Button") + if allow_btn.wait(timeout=5): + allow_btn.click() + self.logger.info(f"[{serial}] Clicked {permission_text}") + time.sleep(1) + + # 3. Detect preview screen + preview_join = d(text="Editing display name") + if preview_join.wait(timeout=5): + self.logger.info(f"[{serial}] Preview screen detected.") + + # Enter name if field is present + name_input = d(className="android.widget.EditText") + if name_input.exists: + self.logger.info( + f"[{serial}] Entering participant name: {participant_name}" + ) + name_input.set_text(participant_name) + time.sleep(1) + d(text="OK").click() + self.logger.info(f"[{serial}] Clicked 'Join' on preview screen.") + # Tap join on preview + d(text="Join").click() + self.logger.info(f"[{serial}] Clicked 'Join' on preview screen.") + + else: + # 4. Old flow: check for name input screen + name_input = d(resourceId="us.zoom.videomeetings:id/edtScreenName") + if name_input.wait(timeout=15): + self.logger.info( + f"[{serial}] Entering participant name: {participant_name}" + ) + name_input.set_text(participant_name) + time.sleep(1) + ok_btn = d(text="OK", className="android.widget.Button") + if ok_btn.exists: + ok_btn.click() + else: + d(resourceId="us.zoom.videomeetings:id/button1").click() + self.logger.info(f"[{serial}] Clicked 'Ok Button'") + else: + self.logger.warning( + f"[{serial}] Name input screen not found. Proceeding..." + ) + + # 5. Wait to join the meeting + self.logger.info(f"[{serial}] Waiting to join meeting...") + time.sleep(10) + + # Reveal controls before checking meeting state or toggles. + self.reveal_zoom_controls(d, (width // 2, height // 2)) + + # 6. Check if in meeting + leave_bounds, _leave_status = self.get_leave_control_info(d) + if leave_bounds: + self.logger.info( + f"[{serial}] Successfully joined the meeting as {participant_name}." + ) + else: + self.logger.warning( + f"[{serial}] Leave button not found. Checking toolbar..." + ) + if d(resourceId="us.zoom.videomeetings:id/panelMeetingToolbar").exists: + self.logger.info( + f"[{serial}] Found meeting toolbar - likely in meeting." + ) + + time.sleep(2) + self.enable_audio_video(d, tap_coords=(width // 2, height // 2)) + time.sleep(2) + count = 0 + while self.end_time is None: + count += 1 + if count > 60: + self.logger.error( + f"[{serial}] Failed to retrieve meeting end time from server after 5 minutes. Leaving meeting." + ) + sys.exit(1) + try: + self.get_start_and_end_time() + time.sleep(5) + except Exception as e: + self.logger.error(f"[{serial}] Error fetching start/end time: {e}") + time.sleep(5) + self.logger.info( + f"[{serial}] Meeting scheduled from {self.start_time} to {self.end_time}" + ) + try: + end_dt = datetime.fromisoformat(self.end_time.replace("Z", "+00:00")) + if end_dt.tzinfo is None: + end_dt = self.tz.localize(end_dt) + else: + end_dt = end_dt.astimezone(self.tz) + meeting_end_dt = end_dt - timedelta(seconds=10) + except Exception as e: + raise RuntimeError(f"Invalid end_time received from server: {e}") + + # self.ping_monitor.start_ping(self.device_serial) + while datetime.now(self.tz) < meeting_end_dt: + if self.check_stop_signal(): + self.logger.info( + f"[{serial}] Stop signal received. Leaving meeting early." + ) + break + time.sleep(2) + + # 7. Stay in the meeting + try: + self.reveal_zoom_controls(d, (width // 2, height // 2)) + + # 8. Leave the meeting + self.logger.info(f"[{serial}] Leaving meeting...") + leave_bounds, _leave_status = self.get_leave_control_info(d) + if leave_bounds and self.tap_bounds_center(d, leave_bounds): + time.sleep(2) + leave_confirm = d(text="Leave meeting") + if leave_confirm.wait(timeout=5): + leave_confirm.click() + self.logger.info(f"[{serial}] Confirmed leaving meeting.") + else: + self.logger.warning( + f"[{serial}] Leave button not found. Pressing back..." + ) + d.press("back") + time.sleep(1) + d.press("back") + except Exception as e: + self.logger.warning( + f"Leave operation not executed, meeting might be ended from host side: {e}" + ) + + def get_start_and_end_time(self): + endpoint_url = f"{self.base_url}/get_start_end_time" + try: + response = requests.get(endpoint_url, timeout=10) + if response.status_code == 200: + data = response.json() + self.start_time = data.get("start_time") + self.end_time = data.get("end_time") + else: + self.logger.error( + f"Failed to fetch start and end time. Status code: {response.status_code}" + ) + except requests.RequestException as e: + self.logger.error(f"Request error: {e}") + + def enable_audio_video(self, d, max_retries=15, tap_coords=(500, 500)): + """ + Continuously check and enable audio and video until both are enabled or retries exhausted. + """ + serial = self.device_serial + self.logger.info(f"[{serial}] Ensuring audio and video are enabled...") + + retries = 0 + audio_enabled = False + video_enabled = False + + while retries < max_retries and not (audio_enabled and video_enabled): + retries += 1 + self.logger.info(f"[{serial}] Check attempt {retries}/{max_retries}") + + self.reveal_zoom_controls(d, tap_coords) + if not audio_enabled: + # --- AUDIO check --- + try: + audio_enabled_state, audio_bounds, audio_status = ( + self.get_audio_control_info(d) + ) + self.logger.info(f"[{serial}] Audio status: {audio_status}") + + if audio_enabled_state is True: + self.logger.info(f"[{serial}] Audio already enabled") + audio_enabled = True + elif audio_enabled_state is False: + self.logger.info(f"[{serial}] Audio is disabled. Enabling...") + if self.tap_bounds_center(d, audio_bounds): + time.sleep(1) + ( + audio_enabled_state, + _audio_bounds, + audio_status, + ) = self.get_audio_control_info(d) + self.logger.info( + f"[{serial}] Audio status after tap: {audio_status}" + ) + if audio_enabled_state is True: + self.logger.info(f"[{serial}] Audio enabled") + audio_enabled = True + else: + self.logger.warning( + f"[{serial}] Audio button bounds missing: {audio_bounds}" + ) + else: + join_audio = d(text="Join Audio") + if join_audio.exists: + self.logger.info( + f"[{serial}] Audio prompt found. Joining audio..." + ) + join_audio.click() + time.sleep(1) + else: + self.logger.warning(f"[{serial}] Audio button not visible") + except Exception as e: + self.logger.error(f"[{serial}] Error checking audio: {e}") + + # --- VIDEO check --- + if not video_enabled: + try: + video_enabled_state, video_bounds, video_status = ( + self.get_video_control_info(d) + ) + self.logger.info(f"[{serial}] Video status: {video_status}") + + if video_enabled_state is True: + self.logger.info(f"[{serial}] Video already enabled") + video_enabled = True + elif video_enabled_state is False: + self.logger.info(f"[{serial}] Video is disabled. Enabling...") + if self.tap_bounds_center(d, video_bounds): + time.sleep(1) + ( + video_enabled_state, + _video_bounds, + video_status, + ) = self.get_video_control_info(d) + self.logger.info( + f"[{serial}] Video status after tap: {video_status}" + ) + if video_enabled_state is True: + self.logger.info(f"[{serial}] Video enabled") + video_enabled = True + else: + self.logger.warning( + f"[{serial}] Video button bounds missing: {video_bounds}" + ) + else: + join_video = d(text="Join Video") + if join_video.exists: + self.logger.info( + f"[{serial}] Video prompt found. Joining video..." + ) + join_video.click() + time.sleep(1) + else: + self.logger.warning(f"[{serial}] Video button not visible") + except Exception as e: + self.logger.error(f"[{serial}] Error checking video: {e}") + + time.sleep(2) + + if audio_enabled and video_enabled: + self.logger.info(f"[{serial}] Both audio and video are enabled.") + else: + self.logger.warning( + f"[{serial}]Could not fully enable audio/video after {max_retries} retries." + ) + + def upload_ping_log(self): + log_path = os.path.join( + os.getcwd(), "zoom_mobile_logs", f"{self.participant_name}_ping.log" + ) + if not os.path.exists(log_path): + self.logger.warning(f"Ping log not found: {log_path}") + return + + endpoint_url = f"{self.base_url}/upload_ping_log" + try: + with open(log_path, "rb") as fp: + files = {"file": (os.path.basename(log_path), fp, "text/plain")} + data = {"participant_name": self.participant_name} + resp = requests.post(endpoint_url, files=files, data=data, timeout=30) + + if resp.status_code == 200: + self.logger.info( + f"[{self.device_serial}] Ping log uploaded successfully" + ) + else: + self.logger.error( + f"[{self.device_serial}] Ping log upload failed: {resp.status_code} {resp.text}" + ) + except Exception as e: + self.logger.error(f"[{self.device_serial}] Error uploading ping log: {e}") + + +def main(): + parser = argparse.ArgumentParser( + description="Automate joining a Zoom meeting on a single Android device." + ) + parser.add_argument("--serial", help="ADB serial number of the target device") + parser.add_argument("--meeting_url", help="Zoom meeting URL or deep link") + parser.add_argument( + "--participant_name", help="Name to use when joining the meeting" + ) + parser.add_argument("--server_host", default="0.0.0.0", help="flask server host") + parser.add_argument( + "--server_port", type=int, default=5000, help="flask server port" + ) + + args = parser.parse_args() + + automator = ZoomAutomator( + server_ip=args.server_host, + server_port=args.server_port, + participant_name=args.participant_name, + ) + try: + automator.set_device(args.serial) + automator.join_zoom_meeting(args.meeting_url, args.participant_name) + except Exception as e: + automator.logger.error(f"Error: {e}") + finally: + try: + # automator.upload_ping_log() + automator.start_interop_app() + except Exception as e: + automator.logger.error(f"Error during cleanup: {e}") + + +if __name__ == "__main__": + main() diff --git a/py-scripts/real_application_tests/zoom_automation/lf_interop_zoom.py b/py-scripts/real_application_tests/zoom_automation/lf_interop_zoom.py index 6f9ccdf83..501b60c65 100644 --- a/py-scripts/real_application_tests/zoom_automation/lf_interop_zoom.py +++ b/py-scripts/real_application_tests/zoom_automation/lf_interop_zoom.py @@ -1,37 +1,54 @@ #!/usr/bin/env python3 """ - NAME: lf_interop_zoom.py +NAME: lf_interop_zoom.py - PURPOSE: lf_interop_zoom.py provides the available devices and allows the user to start Zoom call conference meeting for the user-specified duration +PURPOSE: lf_interop_zoom.py provides the available devices and allows the user to start Zoom call conference meeting for the user-specified duration - EXAMPLE-1: - Command Line Interface to run Zoom with specified duration: - python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" --participants 3 --audio --video --upstream_port 192.168.214.123 +EXAMPLE-1: +Command Line Interface to run Zoom with specified duration: +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" +--participants 3 --audio --video --upstream_port 192.168.214.123 --api_stats_collection --env_file .env - EXAMPLE-2: - Command Line Interface to run Zoom on multiple devices: - python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" --participants 3 --audio --video - --resources 1.400,1.375 --zoom_host 1.95 --upstream_port 192.168.214.123 +EXAMPLE-2: +Command Line Interface to run Zoom on multiple devices: +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" +--participants 3 --audio --video --api_stats_collection --env_file .env --resources 1.400,1.375 --zoom_host 1.95 --upstream_port 192.168.214.123 - Example-3: - Command Line Interface to run Zoom on multiple devices with Device Configuration - python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.204.74" --signin_email "Demo@gmail.com" --signin_passwd "Demo@10203000" --participants 2 --audio --video - --upstream_port 1.1.eth1 --zoom_host 1.95 --resources 1.400,1.360 --ssid NETGEAR_2G_wpa2 --passwd Password@123 --encryp wpa2 --config +Example-3: +Command Line Interface to run Zoom on multiple devices with Device Configuration +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.204.74" --signin_email "Demo@gmail.com" --signin_passwd "Demo@10203000" --participants 2 --audio --video +--upstream_port 1.1.eth1 --zoom_host 1.95 --resources 1.400,1.360 --ssid NETGEAR_2G_wpa2 --passwd Password@123 --encryp wpa2 --config - Example-4: - Command Line Interface to run Zoom on multiple devices with Groups and Profiles - python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.204.74" --signin_email "Demo@gmail.com" --signin_passwd "Demo@10203000" --participants 2 --audio --video - --wait_time 30 --group_name group1,group2 --profile_name netgear5g,netgear2g --file_name grplaptops.csv --zoom_host 1.95 --upstream_port 1.1.eth1 +Example-4: +Command Line Interface to run Zoom on multiple devices with Groups and Profiles +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.204.74" --signin_email "Demo@gmail.com" --signin_passwd "Demo@10203000" --participants 2 --audio --video +--wait_time 30 --group_name group1,group2 --profile_name netgear5g,netgear2g --file_name grplaptops.csv --zoom_host 1.95 --upstream_port 1.1.eth1 +Example-5: +Command Line Interface to run Zoom test with robo feature +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" +--participants 3 --audio --video --upstream_port 192.168.214.123 --robo_ip 192.168.200.131 --coordinates 1,2 --rotations 30,40 --do_robo --api_stats_collection --env_file .env --download_csv +Example-6: +Command Line Interface to get Mos Score in the report: +python3 lf_interop_zoom.py --duration 1 --lanforge_ip "192.168.214.219" --signin_email "demo@gmail.com" --signin_passwd "Demo@123" +--participants 3 --audio --video --resources 1.400,1.375 --zoom_host 1.95 --upstream_port 1.1.eth1 --api_stats_collection --env_file .env --download_csv - NOTES: - 1. Use './lf_interop_zoom.py --help' to see command line usage and options. - 2. Always specify the duration in minutes (for example: --duration 3 indicates a duration of 3 minutes). - 3. If --resources are not given after passing the CLI, a list of available devices (laptops) will be displayed on the terminal. - 4. Enter the resource numbers separated by commas (,) in the resource argument. +Example-7: +Command Line Interface to run Zoom test with robo feature and BS: +python3 lf_interop_zoom.py --lanforge_ip "10.17.1.208" --signin_email "demo@gmail.com" --signin_passwd "demo123" +--participants 2 --audio --video --upstream_port 10.17.1.68 --robo_ip 127.0.0.1:6000 --coordinates 1,2 --cycles 2 +--do_bs --api_stats_collection --env_file .env --bssids 00:11:22:33:44:55,66:77:88:99:AA:BB + + +NOTES: +1. Use './lf_interop_zoom.py --help' to see command line usage and options. +2. Always specify the duration in minutes (for example: --duration 3 indicates a duration of 3 minutes). +3. If --resources are not given after passing the CLI, a list of available devices (laptops) will be displayed on the terminal. +4. Enter the resource numbers separated by commas (,) in the resource argument. """ + import os import csv import time @@ -47,13 +64,20 @@ import logging import json import asyncio -import redis import sys import traceback import textwrap - -sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) -sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../..')) +from requests.auth import HTTPBasicAuth +from dotenv import load_dotenv +import re +import glob +from collections import Counter +import signal +import platform +import subprocess + +sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) +sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "../..")) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))) @@ -76,29 +100,76 @@ RealDevice = lf_base_interop_profile.RealDevice # Set up logging +flask_server_logger = logging.getLogger(__name__) +flask_server_log = logging.getLogger("werkzeug") +flask_server_log.setLevel(logging.ERROR) + +# 1. Configure the logging system +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler("lf_interop_zoom.log", mode="w"), # Writes to file + logging.StreamHandler(sys.stdout), # Writes to terminal + ], +) + +# 2. Create the logger instance logger = logging.getLogger(__name__) -log = logging.getLogger('werkzeug') -log.setLevel(logging.ERROR) - -# Import LF logger configuration module lf_logger_config = importlib.import_module("py-scripts.lf_logger_config") +robo_base_class = importlib.import_module("py-scripts.lf_base_robo") + class ZoomAutomation(Realm): - def __init__(self, ssid="SSID", band="5G", security="wpa2", apname="AP Name", audio=True, video=True, lanforge_ip=None, - upstream_port='0.0.0.0', wait_time=30, devices=None, testname=None, config=None, selected_groups=None, selected_profiles=None): + def __init__( + self, + ssid="SSID", + band="5G", + security="wpa2", + apname="AP Name", + audio=True, + video=True, + lanforge_ip=None, + upstream_port="0.0.0.0", + wait_time=30, + devices=None, + testname=None, + config=None, + selected_groups=None, + selected_profiles=None, + robo_ip="127.0.0.1", + coordinates_list=None, + angles_list=None, + do_robo=False, + current_cord="", + current_angle="", + rotations_enabled=False, + signin_email="", + signin_passwd="", + duration=None, + participants_req=None, + env_file=None, + do_bs=False, + api_stats_collection=False, + do_webui=False, + cycles=1, + bssids=None, + wait_at_point=30, + resource_ip=None, + do_roam=False, + ): super().__init__(lfclient_host=lanforge_ip) self.upstream_port = upstream_port self.mgr_ip = lanforge_ip self.app = Flask(__name__) - self.redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) - self.redis_client.set('login_completed', 0) self.devices = devices self.windows = 0 self.linux = 0 self.mac = 0 + self.android = 0 self.real_sta_os_type = [] self.real_sta_hostname = [] self.real_sta_list = [] @@ -107,27 +178,31 @@ def __init__(self, ssid="SSID", band="5G", security="wpa2", apname="AP Name", au self.login_completed = False # Initially set to False self.remote_login_url = "" # Initialize remote login URL self.remote_login_passwd = "" # Initialize remote login password - self.signin_email = "" - self.signin_passwd = "" + self.signin_email = signin_email + self.signin_passwd = signin_passwd self.test_start = False self.start_time = None self.end_time = None self.participants_joined = 0 - self.participants_req = None + self.participants_req = participants_req self.ap_name = apname self.ssid = ssid self.band = band self.security = security - self.tz = pytz.timezone('Asia/Kolkata') + self.tz = pytz.timezone("Asia/Kolkata") self.meet_link = None self.zoom_host = None self.testname = testname self.stop_signal = False + self.download_csv = False + self.csv_file_name = "csvdata.csv" self.path = os.path.join(os.getcwd(), "zoom_test_results") if not os.path.exists(self.path): os.makedirs(self.path) + self.device_names = [] self.hostname_os_combination = None + self.clients_disconnected = False self.audio = audio self.video = video @@ -136,155 +211,379 @@ def __init__(self, ssid="SSID", band="5G", security="wpa2", apname="AP Name", au self.generic_endps_profile.name_prefix = "zoom" self.generic_endps_profile.type = "zoom" self.data_store = {} - self.header = ["timestamp", - "Sent Audio Frequency (khz)", "Sent Audio Latency (ms)", "Sent Audio Jitter (ms)", "Sent Audio Packet loss (%)", - "Receive Audio Frequency (khz)", "Receive Audio Latency (ms)", "Receive Audio Jitter (ms)", "Receive Audio Packet loss (%)", - "Sent Video Latency (ms)", "Sent Video Jitter (ms)", "Sent Video Packet loss (%)", "Sent Video Resolution (khz)", - "Sent Video Frames ps (khz)", "Receive Video Latency (ms)", "Receive Video Jitter (ms)", "Receive Video Packet loss (%)", - "Receive Video Resolution (khz)", "Receive Video Frames ps (khz)" - ] + self.header = [ + "timestamp", + "Sent Audio Frequency (khz)", + "Sent Audio Latency (ms)", + "Sent Audio Jitter (ms)", + "Sent Audio Packet loss (%)", + "Receive Audio Frequency (khz)", + "Receive Audio Latency (ms)", + "Receive Audio Jitter (ms)", + "Receive Audio Packet loss (%)", + "Sent Video Latency (ms)", + "Sent Video Jitter (ms)", + "Sent Video Packet loss (%)", + "Sent Video Resolution (khz)", + "Sent Video Frames ps (khz)", + "Receive Video Latency (ms)", + "Receive Video Jitter (ms)", + "Receive Video Packet loss (%)", + "Receive Video Resolution (khz)", + "Receive Video Frames ps (khz)", + ] self.config = config - self.selected_groups = selected_groups - self.selected_profiles = selected_profiles - self.config_obj = None + self.selected_groups = list(selected_groups or []) + self.selected_profiles = list(selected_profiles or []) + self.duration = duration + # Single container for raw Zoom QoS and summarized report data. + self.zoom_stats_data = {"raw_qos": [], "summary": {}} + self.env_file = env_file + + self.do_robo = do_robo + self.do_bs = do_bs + if self.do_robo or self.do_bs: + self.robo_ip = robo_ip + self.robo_obj = robo_base_class.RobotClass( + robo_ip=self.robo_ip, angle_list=angles_list + ) + self.coordinates_list = coordinates_list + self.angles_list = angles_list + self.current_cord = current_cord + self.current_angle = current_angle + self.rotations_enabled = rotations_enabled + self.robo_csv_files = [] + + self.account_id = None + self.client_id = None + self.client_secret = None + self.api_stats_collection = api_stats_collection + self.do_webui = do_webui + self.cycles = cycles + self.from_cord = None + self.to_cord = None + self.bssids = bssids or [] + logger.info("Zoom Automation Initialized with the following parameters:") + if self.do_bs: + self.robo_obj.coordinate_list = self.coordinates_list + self.robo_obj.total_cycles = self.cycles + logger.info( + f"User mentioned coordinates list: {self.robo_obj.coordinate_list}" + ) + self.successful_coords = [] + self.failed_coords = [] + self.is_csv_available = False + self.wait_at_point = int(wait_at_point) + self.resource_ip = resource_ip + + def stop_previous_flask_server(self): + """ + Forcefully kills any process currently listening on port 5000 (Linux/Darwin only). + """ + port = 5000 + logger.info( + f"Checking for processes using port {port} to forcefully kill them..." + ) + + current_os = platform.system() + + try: + if current_os in ["Linux", "Darwin"]: + # Find PID on Linux/Mac using lsof + command = f"lsof -t -i:{port}" + try: + output = subprocess.check_output(command, shell=True, text=True) + pids = output.strip().split("\n") + for pid in pids: + if pid.strip(): + logger.info( + f"Killing process {pid} on port {port} ({current_os})..." + ) + os.kill(int(pid.strip()), signal.SIGKILL) + except subprocess.CalledProcessError: + logger.info(f"No process found using port {port} on {current_os}.") + logger.info(f"Port {port} is clear, ready to start Flask server.") + pass + else: + logger.warning( + f"Unsupported OS: {current_os}. Expected Linux or Darwin. Cannot automatically clear port {port}." + ) + + except Exception as e: + logger.warning(f"Error while trying to clear port {port}: {e}") + + def move_ping_logs(self): + source_dir = os.path.join(self.path, "ping_logs") + if not os.path.isdir(source_dir): + logger.info(f"No ping_logs directory found at {source_dir}") + return + + destination_dir = os.path.join(self.report_path_date_time, "ping_logs") + os.makedirs(self.report_path_date_time, exist_ok=True) + + # If destination exists, merge files and remove source + if os.path.exists(destination_dir): + for file_name in os.listdir(source_dir): + src_file = os.path.join(source_dir, file_name) + dst_file = os.path.join(destination_dir, file_name) + if os.path.isfile(src_file): + shutil.move(src_file, dst_file) + shutil.rmtree(source_dir, ignore_errors=True) + logger.info(f"Merged ping logs into {destination_dir}") + else: + shutil.move(source_dir, destination_dir) + logger.info(f"Moved ping logs folder to {destination_dir}") + + def handle_flask_server(self): + self.stop_previous_flask_server() + time.sleep(5) # Ensure the port is released before starting the server + flask_thread = threading.Thread(target=self.start_flask_server) + flask_thread.daemon = True + flask_thread.start() + self.wait_for_flask() def start_flask_server(self): - @self.app.route('/login_url', methods=['GET', 'POST']) + @self.app.route("/login_url", methods=["GET", "POST"]) def login_url(): - if request.method == 'GET': + if request.method == "GET": return jsonify({"login_url": self.remote_login_url}) - elif request.method == 'POST': + elif request.method == "POST": data = request.json - self.remote_login_url = data.get('login_url', '') - return jsonify({"message": f"Updated login_url to {self.remote_login_url}"}) + self.remote_login_url = data.get("login_url", "") + return jsonify( + {"message": f"Updated login_url to {self.remote_login_url}"} + ) - @self.app.route('/login_passwd', methods=['GET', 'POST']) + @self.app.route("/login_passwd", methods=["GET", "POST"]) def login_passwd(): - if request.method == 'GET': + if request.method == "GET": return jsonify({"login_passwd": self.remote_login_passwd}) - elif request.method == 'POST': + elif request.method == "POST": data = request.json - self.remote_login_passwd = data.get('login_passwd', '') + self.remote_login_passwd = data.get("login_passwd", "") return jsonify({"message": "Password updated successfully."}) - @self.app.route('/meeting_link', methods=['GET', 'POST']) + @self.app.route("/meeting_link", methods=["GET", "POST"]) def meeting_link(): - if request.method == 'GET': + if request.method == "GET": return jsonify({"meet_link": self.meet_link}) - elif request.method == 'POST': + elif request.method == "POST": data = request.json - self.meet_link = data.get('meet_link', '') + self.meet_link = data.get("meet_link", "") + self.meet_link = self.meet_link.rsplit(".", 1)[0] + ".1" + + logger.info(f"Zoom host Updated Meet link: {self.meet_link}") return jsonify({"message": "Meeting Link Updated sucessfully"}) - @self.app.route('/login_completed', methods=['GET', 'POST']) + @self.app.route("/login_completed", methods=["GET"]) def login_completed(): - if request.method == 'GET': - login_completed_status = self.redis_client.get('login_completed') - return jsonify({"login_completed": bool(int(login_completed_status)) if login_completed_status else False}) - - elif request.method == 'POST': - data = request.json - login_completed_status = int(data.get('login_completed', 0)) - self.redis_client.set('login_completed', login_completed_status) - return jsonify({"message": f"Updated login_completed status to {bool(login_completed_status)}"}) + if request.method == "GET": + self.login_completed = True + return jsonify({"status": "login_completed"}), 200 - @self.app.route('/get_host_email', methods=['GET']) + @self.app.route("/get_host_email", methods=["GET"]) def get_host_email(): return jsonify({"host_email": self.signin_email}) - @self.app.route('/get_host_passwd', methods=['GET']) + @self.app.route("/get_host_passwd", methods=["GET"]) def get_host_passwd(): return jsonify({"host_passwd": self.signin_passwd}) - @self.app.route('/get_participants_joined', methods=['GET']) + @self.app.route("/get_participants_joined", methods=["GET"]) def get_participants_joined(): return jsonify({"participants": self.participants_joined}) - @self.app.route('/set_participants_joined', methods=['POST']) + @self.app.route("/set_participants_joined", methods=["POST"]) def set_participants_joined(): data = request.json - self.participants_joined = data.get('participants_joined', None) - return jsonify({"message": f"Updated participants jopind status to {self.participants_joined}"}) + self.participants_joined = data.get("participants_joined", None) + return jsonify( + { + "message": f"Updated participants joined status to {self.participants_joined}" + } + ) - @self.app.route('/get_participants_req', methods=['GET']) + @self.app.route("/get_participants_req", methods=["GET"]) def get_participants_req(): return jsonify({"participants": self.participants_req}) - @self.app.route('/test_started', methods=['GET', 'POST']) + @self.app.route("/test_started", methods=["GET", "POST"]) def test_started(): - if request.method == 'GET': + if request.method == "GET": return jsonify({"test_started": self.test_start}) - elif request.method == 'POST': + elif request.method == "POST": data = request.json - self.test_start = data.get('test_started', False) - return jsonify({"message": f"Updated test_start status to {self.test_start}"}) + self.test_start = data.get("test_started", False) + return jsonify( + {"message": f"Updated test_start status to {self.test_start}"} + ) - @self.app.route('/clients_disconnected', methods=['POST']) + @self.app.route("/clients_disconnected", methods=["POST"]) def client_disconnected(): data = request.json - self.clients_disconnected = data.get('clients_disconnected', False) - return jsonify({"message": f"Updated clients_disconnected status to {self.clients_disconnected}"}) + self.clients_disconnected = data.get("clients_disconnected", False) + return jsonify( + { + "message": f"Updated clients_disconnected status to {self.clients_disconnected}" + } + ) - @self.app.route('/get_start_end_time', methods=['GET']) + @self.app.route("/get_start_end_time", methods=["GET"]) def get_start_end_time(): - return jsonify({ - "start_time": self.start_time.isoformat() if self.start_time is not None else None, - "end_time": self.end_time.isoformat() if self.end_time is not None else None - }) + return jsonify( + { + "start_time": ( + self.start_time.isoformat() + if self.start_time is not None + else None + ), + "end_time": ( + self.end_time.isoformat() if self.end_time is not None else None + ), + } + ) - @self.app.route('/stats_opt', methods=['GET']) + @self.app.route("/stats_opt", methods=["GET"]) def stats_to_be_collected(): - return jsonify({ - 'audio_stats': self.audio, - "video_stats": self.video - }) + return jsonify({"audio_stats": self.audio, "video_stats": self.video}) - @self.app.route('/check_stop', methods=['GET']) + @self.app.route("/check_stop", methods=["GET"]) def check_stop(): return jsonify({"stop": self.stop_signal}) - @self.app.route('/upload_stats', methods=['POST']) + @self.app.route("/upload_stats", methods=["POST", "GET"]) def upload_stats(): - data = request.json - for hostname, stats in data.items(): - self.data_store[hostname] = stats - for hostname, stats in data.items(): - - csv_file = os.path.join(self.path, f'{hostname}.csv') - with open(csv_file, mode='a', newline='') as file: - writer = csv.writer(file) - - if os.path.getsize(csv_file) == 0: - writer.writerow(self.header) - - timestamp = stats.get('timestamp', '') - audio = stats.get('audio_stats', {}) - video = stats.get('video_stats', {}) - - row = [ - timestamp, - audio.get('frequency_sent', '0'), audio.get('latency_sent', '0'), audio.get('jitter_sent', '0'), audio.get('packet_loss_sent', '0'), - audio.get('frequency_received', '0'), audio.get('latency_received', '0'), audio.get('jitter_received', '0'), audio.get('packet_loss_received', '0'), - video.get('latency_sent', '0'), video.get('jitter_sent', '0'), video.get('packet_loss_sent', '0'), - video.get('resolution_sent', '0'), video.get('frames_per_second_sent', '0'), - video.get('latency_received', '0'), video.get('jitter_received', '0'), video.get('packet_loss_received', '0'), - video.get('resolution_received', '0'), video.get('frames_per_second_received', '0') - ] - writer.writerow(row) - - return jsonify({"status": "success"}), 200 - - @self.app.route('/get_latest_stats', methods=['GET']) + if self.do_robo or self.do_bs or self.api_stats_collection: + self.get_live_data() + summary_data = self._get_summary_zoom_stats() + if summary_data: + if self.do_bs: + lf_wifi_data = self.get_signal_and_channel_data_dict() + for hostname, stats in summary_data.items(): + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + stats["timestamp"] = timestamp + + if self.do_bs: + x, y, _, _ = self.robo_obj.get_robot_pose() + stats["X"] = x + stats["Y"] = y + stats["From_Coord"] = self.from_cord + stats["To_Coord"] = self.to_cord + sta_id = self.hostname_to_station_map.get( + hostname, None + ) + + if sta_id in lf_wifi_data: + stats.update(lf_wifi_data[sta_id]) + else: + stats.update( + { + "signal": "-", + "channel": "-", + "mode": "-", + "tx_rate": "-", + "rx_rate": "-", + "bssid": "-", + } + ) + + if self.do_robo or self.do_bs: + stats["current_cord"] = self.current_cord + if self.rotations_enabled: + stats["rotations_enabled"] = self.rotations_enabled + stats["current_angle"] = self.current_angle + else: + stats["rotations_enabled"] = False + + # --- CSV FILE PATH GENERATION --- + if self.do_robo: + if self.rotations_enabled: + csv_name = f"{hostname}_{self.current_cord}_{self.current_angle}.csv" + else: + csv_name = f"{hostname}_{self.current_cord}.csv" + else: + csv_name = f"{hostname}.csv" + + csv_file = os.path.join(self.path, csv_name) + + # --- WRITING DATA TO CSV --- + file_exists = ( + os.path.isfile(csv_file) and os.path.getsize(csv_file) > 0 + ) + + with open(csv_file, mode="a", newline="") as file: + headers = list(stats.keys()) + writer = csv.DictWriter(file, fieldnames=headers) + + if not file_exists: + writer.writeheader() + + writer.writerow(stats) + + return "Live Data Processed", 200 + else: + data = request.json + for hostname, stats in data.items(): + self.data_store[hostname] = stats + for hostname, stats in data.items(): + if self.do_robo: + if self.rotations_enabled: + csv_file = os.path.join( + self.path, + f"{hostname}_{self.current_cord}_{self.current_angle}.csv", + ) + else: + csv_file = os.path.join( + self.path, f"{hostname}_{self.current_cord}.csv" + ) + else: + csv_file = os.path.join(self.path, f"{hostname}.csv") + with open(csv_file, mode="a", newline="") as file: + writer = csv.writer(file) + + if os.path.getsize(csv_file) == 0: + writer.writerow(self.header) + + timestamp = stats.get("timestamp", "") + audio = stats.get("audio_stats", {}) + video = stats.get("video_stats", {}) + + row = [ + timestamp, + audio.get("frequency_sent", "0"), + audio.get("latency_sent", "0"), + audio.get("jitter_sent", "0"), + audio.get("packet_loss_sent", "0"), + audio.get("frequency_received", "0"), + audio.get("latency_received", "0"), + audio.get("jitter_received", "0"), + audio.get("packet_loss_received", "0"), + video.get("latency_sent", "0"), + video.get("jitter_sent", "0"), + video.get("packet_loss_sent", "0"), + video.get("resolution_sent", "0"), + video.get("frames_per_second_sent", "0"), + video.get("latency_received", "0"), + video.get("jitter_received", "0"), + video.get("packet_loss_received", "0"), + video.get("resolution_received", "0"), + video.get("frames_per_second_received", "0"), + ] + writer.writerow(row) + + return jsonify({"status": "success"}), 200 + + @self.app.route("/get_latest_stats", methods=["GET"]) def get_latest_stats(): # Return the latest data for all hostnames - return jsonify(self.data_store), 200 + return jsonify(self._get_summary_zoom_stats()), 200 - @self.app.route('/stop_zoom', methods=['GET']) + @self.app.route("/stop_zoom", methods=["GET"]) def stop_zoom(): """ Endpoint to stop the Zoom test and trigger a graceful application shutdown. """ - logging.info("Stopping the test through web UI") + logger.info("Stopping the test through web UI") self.stop_signal = True # Signal to stop the application # Respond to the client response = jsonify({"message": "Stopping Zoom Test"}) @@ -294,47 +593,139 @@ def stop_zoom(): shutdown_thread.start() return response + @self.app.route("/download_csv", methods=["GET"]) + def download_csv_flag(): + return jsonify({"download_csv": self.download_csv}) + + @self.app.route("/upload_csv", methods=["POST"]) + def upload_csv_data(): + try: + data = request.json + + if not data: + return ( + jsonify({"status": "error", "message": "No JSON received"}), + 400, + ) + + filename = data.get("filename", "csvdata.csv") + self.csv_file_name = f"received_{filename}" + rows = data.get("rows", []) + if not rows: + return ( + jsonify({"status": "error", "message": "No rows received"}), + 400, + ) + + filepath = f"received_{filename}" + logger.info( + f"Data Received from Zoom dashboard is stored at: {filepath}" + ) + with open(filepath, "w", newline="") as f: + writer = csv.writer(f) + if rows: + writer.writerows(rows) + self.is_csv_available = True + + return ( + jsonify( + { + "status": "success", + "message": f"Received {len(rows)} rows from {filename}", + "saved_as": filepath, + } + ), + 200, + ) + + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + @self.app.route("/upload_ping_log", methods=["POST"]) + def upload_ping_log(): + try: + if "file" not in request.files: + return jsonify({"status": "error", "message": "Missing file"}), 400 + + f = request.files["file"] + participant_name = request.form.get( + "participant_name", "unknown_participant" + ) + + if not f.filename: + return ( + jsonify({"status": "error", "message": "Empty filename"}), + 400, + ) + + ping_dir = os.path.join(self.path, "ping_logs") + os.makedirs(ping_dir, exist_ok=True) + + # Force controlled filename format to avoid unsafe names from client + save_name = f"{participant_name}_ping.log" + save_path = os.path.join(ping_dir, save_name) + f.save(save_path) + + return ( + jsonify( + { + "status": "success", + "message": "Ping log uploaded", + "saved_as": save_path, + } + ), + 200, + ) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + try: - self.app.run(host='0.0.0.0', port=5000, debug=True, threaded=True, use_reloader=False) + self.app.run( + host="0.0.0.0", port=5000, debug=True, threaded=True, use_reloader=False + ) except Exception as e: - logging.info(f"Error starting Flask server: {e}") + logger.info(f"Error starting Flask server: {e}") sys.exit(0) def shutdown(self): """ Gracefully shut down the application. """ - logging.info("Initiating graceful shutdown...") - - self.stop_signal = True - time.sleep(10) - logging.info("Exiting the application.") + if self.do_robo and self.api_stats_collection: + self.generate_report_from_data() + elif self.api_stats_collection: + self.generate_report_from_api() + self.generic_endps_profile.cleanup() + logger.info("Initiating graceful shutdown...") os._exit(0) def set_start_time(self): - self.start_time = datetime.now(self.tz) + timedelta(seconds=30) - self.end_time = self.start_time + timedelta(minutes=self.duration) + self.start_time = datetime.now(self.tz) + timedelta(seconds=60) + if self.do_bs: + self.end_time = self.start_time + timedelta(minutes=300000) + else: + self.end_time = self.start_time + timedelta(minutes=self.duration) return [self.start_time, self.end_time] def check_gen_cx(self): try: for gen_endp in self.generic_endps_profile.created_endp: - generic_endpoint = self.json_get(f'/generic/{gen_endp}') + generic_endpoint = self.json_get(f"/generic/{gen_endp}") if not generic_endpoint or "endpoint" not in generic_endpoint: - logging.info(f"Error fetching endpoint data for {gen_endp}") + logger.info(f"Error fetching endpoint data for {gen_endp}") return False endp_status = generic_endpoint["endpoint"].get("status", "") - if endp_status == "Run": + if endp_status not in ["Stopped", "WAITING", "NO-CX"]: return False return True except Exception as e: - logging.error(f"Error in check_gen_cx function {e}", exc_info=True) - logging.info(f"generic endpoint data {generic_endpoint}") + logger.error(f"Error in check_gen_cx function {e}", exc_info=True) + logger.info(f"generic endpoint data {generic_endpoint}") def wait_for_flask(self, url="http://127.0.0.1:5000/get_latest_stats", timeout=10): """Wait until the Flask server is up, but exit if it takes longer than `timeout` seconds.""" @@ -343,58 +734,141 @@ def wait_for_flask(self, url="http://127.0.0.1:5000/get_latest_stats", timeout=1 try: response = requests.get(url, timeout=1) if response.status_code == 200: - logging.info("✅ Flask server is up and running!") + logging.info("Flask server is up and running!") return except requests.exceptions.ConnectionError: time.sleep(1) - logging.error("❌ Flask server did not start within 10 seconds. Exiting.") + logging.error("Flask server did not start within 10 seconds. Exiting.") sys.exit(1) - def run(self, duration, upstream_port, signin_email, signin_passwd, participants): - # Store the email and password in the instance - self.signin_email = signin_email - self.signin_passwd = signin_passwd - self.duration = duration - self.upstream_port = upstream_port - self.participants_req = participants - flask_thread = threading.Thread(target=self.start_flask_server) - flask_thread.daemon = True - flask_thread.start() - self.wait_for_flask() - ports_list = [] - eid = "" - resource_ip = "" - user_resources = ['.'.join(item.split('.')[:2]) for item in self.real_sta_list] + def create_android( + self, + lanforge_res, + ports=None, + sleep_time=0.5, + debug_=False, + suppress_related_commands_=None, + real_client_os_types=None, + ): + if ports and real_client_os_types and len(real_client_os_types) == 0: + logger.error("Real client operating systems types is empty list") + raise ValueError("Real client operating systems types is empty list") + created_cx = [] + created_endp = [] + + if not ports: + ports = [] + + if self.debug: + debug_ = True + + post_data = [] + endp_tpls = [] + for port_name in ports: + port_info = self.name_to_eid(port_name) + resource = port_info[1] + shelf = port_info[0] + if real_client_os_types: + name = port_name + else: + name = port_info[2] + + gen_name_a = "%s-%s" % ("zoom", "_".join(port_name.split("."))) + endp_tpls.append((shelf, resource, name, gen_name_a)) + + for endp_tpl in endp_tpls: + shelf = endp_tpl[0] + resource = endp_tpl[1] + if real_client_os_types: + name = endp_tpl[2].split(".")[2] + else: + name = endp_tpl[2] + gen_name_a = endp_tpl[3] + + data = { + "alias": gen_name_a, + "shelf": shelf, + "resource": lanforge_res.split(".")[1], + "port": "eth0", + "type": "gen_generic", + } + self.json_post("cli-json/add_gen_endp", data, debug_=self.debug) + + self.json_post("/cli-json/nc_show_endpoints", {"endpoint": "all"}) + if sleep_time: + time.sleep(sleep_time) + + for endp_tpl in endp_tpls: + gen_name_a = endp_tpl[3] + self.generic_endps_profile.set_flags(gen_name_a, "ClearPortOnStart", 1) + + for endp_tpl in endp_tpls: + name = endp_tpl[2] + gen_name_a = endp_tpl[3] + cx_name = "CX_%s-%s" % ("generic", gen_name_a) + data = {"alias": cx_name, "test_mgr": "default_tm", "tx_endp": gen_name_a} + post_data.append(data) + created_cx.append(cx_name) + created_endp.append(gen_name_a) + + for data in post_data: + url = "/cli-json/add_cx" + self.json_post( + url, + data, + debug_=debug_, + suppress_related_commands_=suppress_related_commands_, + ) + if sleep_time: + time.sleep(sleep_time) + + for data in post_data: + self.json_post( + "/cli-json/show_cx", + {"test_mgr": "default_tm", "cross_connect": data["alias"]}, + ) + return True, created_cx, created_endp + + def get_resource_data(self): + self.ports_list = [] + self.user_list = [] + self.serial_list = [] + self.lanforge_port_list = [] + self.device_names = [] + self.user_resources = [ + ".".join(item.split(".")[:2]) for item in self.real_sta_list + ] # Step 1: Retrieve information about all resources response = self.json_get("/resource/all") # Step 2: Match user-specified resources with available resources sequentially - if user_resources: - # Iterate through user_resources sequentially, processing each value only once - for user_resource in user_resources: - # Break loop if no more user_resources left to process - if not user_resources: - break - - for key, value in response.items(): - if key == "resources": - for element in value: - for resource_key, resource_values in element.items(): - # Match the current user_resource - if resource_key == user_resource: - eid = resource_values["eid"] - resource_ip = resource_values['ctrl-ip'] - self.device_names.append(resource_values['hostname']) - ports_list.append({'eid': eid, 'ctrl-ip': resource_ip}) - break - else: - # Continue outer loop only if no break occurred - continue - # Break if a match was found and processed - break + if self.user_resources: + resources = response.get("resources", []) + for user_resource in self.user_resources: + found = False + for element in resources: + if user_resource in element: + resource_values = element[user_resource] + eid = resource_values["eid"] + resource_ip = resource_values["ctrl-ip"] + hostname = resource_values["hostname"] + user = resource_values["user"] + + self.device_names.append(hostname) + self.ports_list.append({"eid": eid, "ctrl-ip": resource_ip}) + self.user_list.append(user) + + found = True + break + + if not found: + logger.warning( + f"Resource {user_resource} not found in LANforge response" + ) - gen_ports_list = [] + def get_ports_data(self): + self.gen_ports_list = [] self.mac_list = [] self.rssi_list = [] self.link_rate_list = [] @@ -404,36 +878,36 @@ def run(self, duration, upstream_port, signin_email, signin_passwd, participants response_port = self.json_get("/port/all") # Step 4: Match ports associated with retrieved resources in the order of ports_list - for port_entry in ports_list: + for port_entry in self.ports_list: # Extract the eid and ctrl-ip from the current ports_list entry - expected_eid = port_entry['eid'] + expected_eid = port_entry["eid"] # Iterate over the port interfaces to find a matching port - for interface in response_port['interfaces']: - for port, _ in interface.items(): + for interface in response_port["interfaces"]: + for port, _port_data in interface.items(): # Extract the first two segments of the port identifier to match with expected_eid - result = '.'.join(port.split('.')[:2]) + result = ".".join(port.split(".")[:2]) # Check if the result matches the current expected eid from ports_list if result == expected_eid: - gen_ports_list.append(port.split('.')[-1]) + self.gen_ports_list.append(port.split(".")[-1]) break else: continue break - for port_entry in ports_list: + for port_entry in self.ports_list: # Extract the eid and ctrl-ip from the current ports_list entry - expected_eid = port_entry['eid'] + expected_eid = port_entry["eid"] # Iterate over the port interfaces to find a matching port - for interface in response_port['interfaces']: + for interface in response_port["interfaces"]: for port, port_data in interface.items(): # Extract the first two segments of the port identifier to match with expected_eid - result = '.'.join(port.split('.')[:2]) + result = ".".join(port.split(".")[:2]) # Check if the result matches the current expected eid from ports_list - if result == expected_eid and port_data["parent dev"] == 'wiphy0': + if result == expected_eid and port_data["parent dev"] == "wiphy0": self.mac_list.append(port_data["mac"]) self.rssi_list.append(port_data["signal"]) self.link_rate_list.append(port_data["rx-rate"]) @@ -443,82 +917,301 @@ def run(self, duration, upstream_port, signin_email, signin_passwd, participants else: continue break - self.new_port_list = [item.split('.')[2] for item in self.real_sta_list] + self.wifi_interface_list = [item.split(".")[2] for item in self.real_sta_list] + + def get_interop_data(self): + interop_data = self.json_get("/adb") + interop_mobile_data = interop_data.get("devices", {}) + self.serial_list = [] + self.lanforge_port_list = [] + for user in self.user_list: + if user == "": + self.serial_list.append("") + self.lanforge_port_list.append("") + else: + user_found = False + # 1. Handle Single Device (Flat Dictionary) + if isinstance(interop_mobile_data, dict): + if interop_mobile_data.get("user-name") == user: + # Extract details from 'name' (e.g., '1.1.3200f8664a91a5e9') + full_name = interop_mobile_data.get("name") + if full_name and full_name.count(".") >= 2: + resource = full_name.split(".")[1] + serial_no = full_name.split(".")[2] + self.serial_list.append(serial_no) + self.lanforge_port_list.append(f"1.{resource}.eth0") + user_found = True + else: + for mobile_device in interop_mobile_data: + for serial, device_data in mobile_device.items(): + if device_data.get("user-name") == user: + resource = serial.split(".")[1] + serial_no = serial.split(".")[2] + self.serial_list.append(serial_no) + lanforge_port = f"1.{resource}.eth0" + self.lanforge_port_list.append(lanforge_port) + user_found = True + break + if user_found: + break + + if not user_found: + self.serial_list.append("") + self.lanforge_port_list.append("") + + logger.debug(f"Checking serial list {self.serial_list}") + + def delete_current_csv_files(self): + filename_pattern = ( + f"*_{self.current_cord}_{self.current_angle}.csv" + if self.rotations_enabled + else f"*_{self.current_cord}.csv" + ) + csv_files_pattern = os.path.join(self.path, filename_pattern) + csv_files = glob.glob(csv_files_pattern) - if self.generic_endps_profile.create(ports=[self.real_sta_list[0]], real_client_os_types=[self.real_sta_os_type[0]]): - logging.info('Real client generic endpoint creation completed.') + for file_path in csv_files: + try: + os.remove(file_path) + logger.info(f"Deleted CSV file: {file_path}") + except Exception as e: + logger.error(f"Error deleting file {file_path}: {e}") + + def create_host(self): + if self.generic_endps_profile.create( + ports=[self.real_sta_list[0]], + real_client_os_types=[self.real_sta_os_type[0]], + ): + logger.info("Real client generic endpoint creation completed.") else: - logging.error('Real client generic endpoint creation failed.') + logger.error("Real client generic endpoint creation failed.") exit(0) if self.real_sta_os_type[0] == "windows": cmd = f"py zoom_host.py --ip {self.upstream_port}" - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[0], cmd) - elif self.real_sta_os_type[0] == 'linux': - - cmd = "su -l lanforge ctzoom.bash %s %s %s" % (self.new_port_list[0], self.upstream_port, "host") - - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[0], cmd) - elif self.real_sta_os_type[0] == 'macos': + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[0], cmd + ) + elif self.real_sta_os_type[0] == "linux": + cmd = "su -l lanforge ctzoom.bash %s %s %s" % ( + self.wifi_interface_list[0], + self.upstream_port, + "host", + ) + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[0], cmd + ) + elif self.real_sta_os_type[0] == "macos": cmd = "sudo bash ctzoom.bash %s %s" % (self.upstream_port, "host") - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[0], cmd) + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[0], cmd + ) self.generic_endps_profile.start_cx() time.sleep(5) + logger.debug(f"checking real sta list {self.real_sta_list}") + logger.debug(f"checking real sta os type {self.real_sta_os_type}") + + def wait_for_host_ready(self): while not self.login_completed: try: - self.login_completed = bool(int(self.redis_client.get('login_completed') or 0)) - - generic_endpoint = self.json_get(f'/generic/{self.generic_endps_profile.created_endp[0]}') + generic_endpoint = self.json_get( + f"/generic/{self.generic_endps_profile.created_endp[0]}" + ) endp_status = generic_endpoint["endpoint"]["status"] if endp_status == "Stopped": - logging.info("Failed to Start the Host Device") + logger.error("Failed to Start the Host Device") self.generic_endps_profile.cleanup() sys.exit(1) time.sleep(5) except Exception as e: - logging.info(f"Error while checking login_completed status: {e}") + logger.error(f"Error while checking login_completed status: {e}") time.sleep(5) - if self.generic_endps_profile.create(ports=self.real_sta_list[1:], real_client_os_types=self.real_sta_os_type[1:]): - logging.info('Real client generic endpoint creation completed.') - else: - logging.error('Real client generic endpoint creation failed.') - exit(0) + self.meet_link = f"https://us04web.zoom.us/j/{self.remote_login_url}?pwd={self.remote_login_passwd}" + logger.info(f"Meet link for android devices: {self.meet_link}") + + # Save meet link in a text file under self.path + try: + meet_link_file = os.path.join(self.path, "meet_link.txt") + with open(meet_link_file, "w") as f: + f.write(self.meet_link + "\n") + logger.info(f"Meet link saved to: {meet_link_file}") + except Exception as e: + logger.error(f"Failed to save meet link file: {e}") + + self.login_completed = False + + def create_participants(self): for i in range(1, len(self.real_sta_os_type)): + if self.real_sta_os_type[i] == "android": + status, created_cx, created_endp = self.create_android( + lanforge_res=self.lanforge_port_list[i], + ports=[self.real_sta_list[i]], + real_client_os_types=["Linux"], + ) + self.generic_endps_profile.created_endp.extend(created_endp) + self.generic_endps_profile.created_cx.extend(created_cx) + cmd = ( + f"python3 /home/lanforge/lanforge-scripts/py-scripts/real_application_tests/zoom_automation/android_zoom.py " + f"--serial {self.serial_list[i]} " + f"--meeting_url '{self.meet_link}' " + f"--participant_name '{self.real_sta_hostname[i]}' " + f"--server_host {self.mgr_ip} " + f"--server_port 5000" + ) + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[i], cmd + ) + + else: + self.generic_endps_profile.create( + ports=[self.real_sta_list[i]], + real_client_os_types=[self.real_sta_os_type[i]], + ) + for i in range(1, len(self.real_sta_os_type)): if self.real_sta_os_type[i] == "windows": cmd = f"py zoom_client.py --ip {self.upstream_port}" - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[i], cmd) - elif self.real_sta_os_type[i] == 'linux': - cmd = "su -l lanforge ctzoom.bash %s %s %s" % (self.new_port_list[i], self.upstream_port, "client") - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[i], cmd) - elif self.real_sta_os_type[i] == 'macos': + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[i], cmd + ) + elif self.real_sta_os_type[i] == "linux": + cmd = "su -l lanforge ctzoom.bash %s %s %s" % ( + self.wifi_interface_list[i], + self.upstream_port, + "client", + ) + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[i], cmd + ) + elif self.real_sta_os_type[i] == "macos": cmd = "sudo bash ctzoom.bash %s %s" % (self.upstream_port, "client") - self.generic_endps_profile.set_cmd(self.generic_endps_profile.created_endp[i], cmd) - - self.start_client_cx() + self.generic_endps_profile.set_cmd( + self.generic_endps_profile.created_endp[i], cmd + ) + + cx_name = self.generic_endps_profile.created_cx[i] + self.json_post( + "/cli-json/set_cx_state", + {"test_mgr": "default_tm", "cx_name": cx_name, "cx_state": "RUNNING"}, + debug_=True, + ) + logger.info(f"Sending running state to.. {cx_name}") + def wait_for_test_start(self): + # Wait for the test to be started + count = 0 while not self.test_start: - - logging.info("WAITING FOR THE TEST TO BE STARTED") + logger.info("WAITING FOR THE TEST TO BE STARTED") time.sleep(5) - + count += 1 + if count > 36: + logger.error( + "Unable to get the Test Start signal Even after 3 minutes. Exiting." + ) + sys.exit(1) + self.test_start = False + if self.do_bs: + self.bs_coord_result = self.robo_obj.get_coordinates_list() + logger.info(f"Total Coordinates to be Visited: {self.bs_coord_result}") + if self.bs_coord_result: + self.from_cord = self.coordinates_list[0] + self.successful_coords.append(self.from_cord) + self.current_cord = self.from_cord self.set_start_time() - logging.info("TEST WILL BE STARTING") + logger.info("TEST WILL BE STARTING") - while datetime.now(self.tz) < self.end_time or not self.check_gen_cx(): + def run(self): + self.create_host() + self.wait_for_host_ready() + self.create_participants() + self.wait_for_test_start() - time.sleep(5) + if self.do_bs: + time.sleep(60) + + try: + logger.info( + f"Band-Steering Test coordinates to be visited: {self.bs_coord_result}" + ) + + if not self.bs_coord_result: + logger.error( + "No coordinates available (bs_coord_result is empty). Skipping BS test." + ) + self.stop_signal = True + return + + for _idx, coordinate in enumerate(self.bs_coord_result): + logger.info(f"Moving robot to coordinate: {coordinate}") + self.from_cord = self.to_cord + self.to_cord = coordinate + + # Battery safety + self.robo_obj.wait_for_battery() + + matched, aborted = self.robo_obj.move_to_coordinate( + coord=coordinate + ) + if matched: + self.current_cord = coordinate + self.successful_coords.append(coordinate) + else: + self.failed_coords.append(coordinate) + if aborted: + logger.error(f"Failed to reach the {coordinate}") + self.failed_coords.append(coordinate) + sys.exit() + + logger.info( + "All coordinates completed — stopping Band-Steering Test" + ) + self.stop_signal = True + time.sleep(5) + except Exception as e: + logger.error(f"Error during band-steering operation: {e}", exc_info=True) + + finally: + count = 0 + if self.download_csv: + while not self.is_csv_available: + count += 1 + if count > 60: + logger.warning( + "CSV data from Zoom dashboard is not available after waiting for 5 minutes. Proceeding with report generation without CSV data." + ) + break + logger.info( + "Waiting for CSV data from Zoom dashboard to be available before proceeding with the Report generation and cleanup" + ) + time.sleep(5) - def start_client_cx(self): - client_cx = self.generic_endps_profile.created_cx[1:] - for cx_name in client_cx: - self.json_post("/cli-json/set_cx_state", { - "test_mgr": "default_tm", - "cx_name": cx_name, - "cx_state": "RUNNING" - }, debug_=True) + else: + while datetime.now(self.tz) < self.end_time or not self.check_gen_cx(): + if self.do_robo: + pause, _ = self.robo_obj.wait_for_battery() + if pause: + self.stop_signal = True + self.generic_endps_profile.stop_cx() + self.generic_endps_profile.cleanup() + self.delete_current_csv_files() + self.start_time = None + self.end_time = None + time.sleep(20) + self.stop_signal = False + self.participants_joined = 0 + self.create_host() + self.wait_for_host_ready() + self.create_participants() + self.wait_for_test_start() + logger.info("Monitoring the Test") + time.sleep(5) + if self.do_robo: + self.generic_endps_profile.stop_cx() + self.generic_endps_profile.cleanup() + self.start_time = None + self.end_time = None def select_real_devices(self, real_device_obj, real_sta_list=None): final_device_list = [] @@ -545,7 +1238,6 @@ def select_real_devices(self, real_device_obj, real_sta_list=None): 9. Returns the sorted list of selected real station names. """ - real_device_obj.get_devices() # Query and retrieve all user-defined real stations if `real_sta_list` is not provided if real_sta_list is None: self.real_sta_list, _, _ = real_device_obj.query_user() @@ -554,35 +1246,76 @@ def select_real_devices(self, real_device_obj, real_sta_list=None): interfaces = interface_data["interfaces"] final_device_list = [] # Initialize the list - for device in real_sta_list: # Iterate over devices in `real_sta_list` to preserve order + for ( + device + ) in ( + real_sta_list + ): # Iterate over devices in `real_sta_list` to preserve order + device_found = False for interface_dict in interfaces: # Iterate through `interfaces` - for key, value in interface_dict.items(): # Iterate through items of each interface dictionary + for ( + key, + value, + ) in ( + interface_dict.items() + ): # Iterate through items of each interface dictionary # Check conditions for adding the device key_parts = key.split(".") extracted_key = ".".join(key_parts[:2]) - if extracted_key == device and not value["phantom"] and not value["down"] and value["parent dev"] != "": - final_device_list.append(key) # Add to final_device_list in order + if ( + extracted_key == device + and not value["phantom"] + and not value["down"] + and value["parent dev"] != "" + ): + final_device_list.append( + key + ) # Add to final_device_list in order + device_found = True break # Stop after finding the first match for the current device to maintain order + if device_found: + break self.real_sta_list = final_device_list # Log an error and exit if no real stations are selected for testing if len(self.real_sta_list) == 0: - logger.error('There are no real devices in this testbed. Aborting test') + logger.error("There are no real devices in this testbed. Aborting test") exit(0) # Filter out iOS devices from the real_sta_list before proceeding self.real_sta_list = self.filter_ios_devices(self.real_sta_list) - # # Add real station data to `self.real_sta_data_dict` + + # Rebuild a clean, ordered and unique station list (avoid mutating while iterating) + self.real_sta_data = {} + cleaned_sta_list = [] + seen_sta = set() + for sta_name in self.real_sta_list: + if sta_name in seen_sta: + continue if sta_name not in real_device_obj.devices_data: - self.real_sta_list.remove(sta_name) - logger.error('Real station not in devices data, ignoring it from testing') + logger.error( + "Real station not in devices data, ignoring it from testing" + ) continue + seen_sta.add(sta_name) + cleaned_sta_list.append(sta_name) self.real_sta_data[sta_name] = real_device_obj.devices_data[sta_name] - self.real_sta_os_type = [self.real_sta_data[real_sta_name]['ostype'] for real_sta_name in self.real_sta_data] - self.real_sta_hostname = [self.real_sta_data[real_sta_name]['hostname'] for real_sta_name in self.real_sta_data] + self.real_sta_list = cleaned_sta_list + self.real_sta_os_type = [ + self.real_sta_data[real_sta_name]["ostype"] + for real_sta_name in self.real_sta_data + ] + self.real_sta_hostname = [ + ( + self.real_sta_data[real_sta_name]["hostname"] + if self.real_sta_data[real_sta_name]["ostype"] != "android" + else self.real_sta_data[real_sta_name]["user"] + ) + for real_sta_name in self.real_sta_data + ] self.zoom_host = self.real_sta_list[0] self.hostname_os_combination = [ @@ -590,17 +1323,599 @@ def select_real_devices(self, real_device_obj, real_sta_list=None): for hostname, os_type in zip(self.real_sta_hostname, self.real_sta_os_type) ] - for _, value in self.real_sta_data.items(): - if value['ostype'] == 'windows': + for _key, value in self.real_sta_data.items(): + if value["ostype"] == "windows": self.windows = self.windows + 1 - elif value['ostype'] == 'macos': + elif value["ostype"] == "macos": self.mac = self.mac + 1 - elif value['ostype'] == 'linux': + elif value["ostype"] == "linux": self.linux = self.linux + 1 + elif value["ostype"] == "android": + self.android = self.android + 1 + + # Create mapping: { 'Hostname': 'Station_ID' } + self.hostname_to_station_map = dict( + zip(self.real_sta_hostname, self.real_sta_list) + ) # Return the sorted list of selected real station names return self.real_sta_list + def get_signal_and_channel_data_dict(self): + """ + Returns a dictionary of LANforge stats keyed by station name. + Example: {'sta001': {'lf_signal': -55, 'lf_channel': 36, ...}} + """ + lf_stats_map = {} + interfaces_dict = dict() + + try: + # Get raw data from LANforge API + port_data = self.json_get("/ports/all/")["interfaces"] + for port in port_data: + interfaces_dict.update(port) + except Exception as e: + logger.error(f"Error fetching port data: {e}") + return {} + + # Loop through your managed stations (e.g., sta001, sta002) + for sta in self.real_sta_list: + # Default values if station is missing + lf_stats_map[sta] = { + "signal": "-", + "channel": "-", + "mode": "-", + "tx_rate": "-", + "rx_rate": "-", + "bssid": "-", + } + + if sta in interfaces_dict: + data = interfaces_dict[sta] + + # --- Signal Parsing --- + sig = data.get("signal", "-") + if "dBm" in str(sig): + lf_stats_map[sta]["signal"] = sig.split(" ")[0] + else: + lf_stats_map[sta]["signal"] = sig + + # --- Other Fields --- + lf_stats_map[sta]["channel"] = data.get("channel", "-") + lf_stats_map[sta]["mode"] = data.get("mode", "-") + lf_stats_map[sta]["tx_rate"] = data.get("tx-rate", "-") + lf_stats_map[sta]["rx_rate"] = data.get("rx-rate", "-") + lf_stats_map[sta]["bssid"] = data.get( + "ap", "-" + ) # 'ap' is usually BSSID + + return lf_stats_map + + def get_access_token(self, account_id, client_id, client_secret): + token_url = f"https://zoom.us/oauth/token?grant_type=account_credentials&account_id={account_id}" + response = requests.post( + token_url, auth=HTTPBasicAuth(client_id, client_secret) + ) + if response.status_code == 200: + access_token = response.json().get("access_token") + return access_token + else: + raise Exception( + f"Failed to get access token: {response.status_code} {response.text}" + ) + + def get_participants_qos(self, meeting_id, access_token, test_type="past"): + url = f"https://api.zoom.us/v2/metrics/meetings/{meeting_id}/participants/qos" + headers = {"Authorization": f"Bearer {access_token}"} + params = {"type": test_type} + all_participants = [] + next_page_token = None + + try: + while True: + if next_page_token: + params["next_page_token"] = next_page_token + + response = requests.get(url, headers=headers, params=params) + if response.status_code == 200: + data = response.json() + participants = data.get("participants", []) + all_participants.extend(participants) + next_page_token = data.get("next_page_token") + if not next_page_token: + break + else: + raise Exception( + f"Failed to get participants QoS: {response.status_code} {response.text}" + ) + except Exception as e: + cached_qos = self._get_raw_zoom_stats() + if cached_qos: + logger.warning( + f"Failed to get participants QoS for {test_type}. Using last cached participant QoS data: {e}" + ) + return cached_qos + raise + + if all_participants: + return self._set_raw_zoom_stats(all_participants) + + cached_qos = self._get_raw_zoom_stats() + if cached_qos: + logger.warning( + f"Zoom API returned no participant QoS data for {test_type}. Using last cached participant QoS data." + ) + return cached_qos + + logger.warning( + f"Zoom API returned no participant QoS data for {test_type} and no cached data is available." + ) + return [] + + def save_json(self, data, filename): + os.makedirs("zoom_api_responses", exist_ok=True) + path = os.path.join("zoom_api_responses", filename) + with open(path, "w") as f: + json.dump(data, f, indent=2) + + def get_live_data(self): + try: + # retrieving with past meetings + token = self.get_access_token( + self.account_id, self.client_id, self.client_secret + ) + self._set_raw_zoom_stats( + self.get_participants_qos(self.remote_login_url, token, "live") + ) + self.summarize_audio_video(self._get_raw_zoom_stats()) + + except Exception as e: + logger.info( + f"Unable to fetch live meeting data...retrying in 5 seconds {e}" + ) + + def get_final_qos_data(self): + # 1. Check Credentials (using instance variables) + if not all([self.account_id, self.client_id, self.client_secret]): + logger.error("Exiting test due to missing credentials.") + raise ValueError( + "Missing Zoom credentials (self.account_id, self.client_id, self.client_secret)" + ) + + meeting_id = self.remote_login_url + logger.info(f"Meeting ID: {meeting_id}") + + # 2. Get Token & Wait for Data Indexing + token = self.get_access_token( + self.account_id, self.client_id, self.client_secret + ) + + # Zoom QoS data is typically available ~20 seconds after meeting end. + # We wait 150 seconds to be safe and simplify the logic. + wait_time = 150 + logger.info( + f"Waiting {wait_time} seconds for Zoom servers to index past meeting QoS data..." + ) + time.sleep(wait_time) + + # 3. Fetch Data (Try 'Past' first, fallback to 'Live') + try: + logger.info("Attempting to fetch 'past' meeting data...") + past_qos_data = self.get_participants_qos(meeting_id, token, "past") + + # If past data is empty, raise error to trigger fallback + if not past_qos_data: + raise ValueError("Zoom API returned empty data for past meeting.") + + except Exception as e: + logger.warning( + f"Could not fetch 'past' data ({e}). Falling back to 'live' meeting data..." + ) + try: + self.get_participants_qos(meeting_id, token, "live") + except Exception as e_live: + logger.error(f"Failed to fetch both past and live data: {e_live}") + + # 4. Summarize and Save JSON + raw_qos_data = self._get_raw_zoom_stats() + summary_data = self.summarize_audio_video(raw_qos_data) + + # Construct JSON filename + if self.do_robo: + json_name = ( + f"{meeting_id}_{self.current_cord}_{self.current_angle}_qos.json" + ) + else: + json_name = f"{meeting_id}_qos.json" + + self.save_json(raw_qos_data, json_name) + + # 5. Write to CSV (Integrated Logic) + if self.do_robo or self.do_bs or self.api_stats_collection: + if summary_data: + logger.info("Writing final QoS data to CSV...") + + # Fetch Wifi Data if needed + lf_wifi_data = {} + if self.do_bs: + try: + lf_wifi_data = self.get_signal_and_channel_data_dict() + except Exception as e: + logger.warning(f"Could not fetch WiFi data for CSV: {e}") + + for hostname, stats in summary_data.items(): + final_filename = hostname + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + stats["timestamp"] = timestamp + + # Add Robot/BS specific data + if self.do_bs: + x, y, _, _ = self.robo_obj.get_robot_pose() + stats["X"] = x + stats["Y"] = y + stats["From_Coord"] = self.from_cord + stats["To_Coord"] = self.to_cord + + sta_id = self.hostname_to_station_map.get(final_filename, None) + if sta_id in lf_wifi_data: + stats.update(lf_wifi_data[sta_id]) + else: + stats.update( + { + "signal": "-", + "channel": "-", + "mode": "-", + "tx_rate": "-", + "rx_rate": "-", + "bssid": "-", + } + ) + + # Add Coordinate/Angle data + if self.do_robo or self.do_bs: + stats["current_cord"] = self.current_cord + if self.rotations_enabled: + stats["rotations_enabled"] = self.rotations_enabled + stats["current_angle"] = self.current_angle + else: + stats["rotations_enabled"] = False + + # Generate CSV Filename + if self.do_robo: + if self.rotations_enabled: + csv_name = f"{final_filename}_{self.current_cord}_{self.current_angle}.csv" + else: + csv_name = f"{final_filename}_{self.current_cord}.csv" + else: + csv_name = f"{final_filename}.csv" + + csv_file = os.path.join(self.path, csv_name) + + # Write to File + try: + file_exists = ( + os.path.isfile(csv_file) and os.path.getsize(csv_file) > 0 + ) + + with open(csv_file, mode="a", newline="") as file: + headers = list(stats.keys()) + writer = csv.DictWriter(file, fieldnames=headers) + + if not file_exists: + writer.writeheader() + + writer.writerow(stats) + except Exception as e: + logger.error(f"Failed to write CSV for {hostname}: {e}") + + def parse_value(self, value): + """Convert Zoom string values to float. Handles kbps, ms, and %.""" + if not value or value in ["-", ""]: + return None + try: + return float(value.split()[0].replace("%", "")) + except Exception as e: + logger.error(f"Error parsing value '{value}': {e}") + return None + + def parse_zoom_value(self, value): + """ + Convert Zoom string metrics into a float. + Handles cases like: + - "123 kbps" + - "21 ms" + - "5.6 %" + - "21 ms/40 ms" + - "Good(4.41)" + - "-" or empty values + """ + if not value or str(value).strip() in ["-", ""]: + return None + + value = str(value).strip() + + # Handle formats like "Good(4.41)" + if re.match(r"^[A-Za-z]+\([\d.]+\)$", value): + return value + + # Handle "21 ms/40 ms" (avg/max -> take avg only) + if "/" in value: + avg_part = value.split("/", 1)[0].strip() + + # Missing avg like "-/3.9 %" should not use max + if avg_part in ["", "-"]: + return None + + nums = re.findall(r"[\d.]+", avg_part) + return float(nums[0]) if nums else None + + # General case: "123 kbps", "45 ms", "6.7 %" + try: + return float(value.split()[0].replace("%", "")) + except Exception: + return None + + def _clean_zoom_participant_name(self, participant_name): + if participant_name is None: + return None + + return str(participant_name).replace("(Guest)", "").strip() + + def _get_raw_zoom_stats(self): + raw_qos = self.zoom_stats_data.get("raw_qos", []) + return raw_qos if isinstance(raw_qos, list) else [] + + def _set_raw_zoom_stats(self, raw_qos): + self.zoom_stats_data["raw_qos"] = list(raw_qos) if raw_qos else [] + return self.zoom_stats_data["raw_qos"] + + def _get_summary_zoom_stats(self): + summary = self.zoom_stats_data.get("summary", {}) + return summary if isinstance(summary, dict) else {} + + def _set_summary_zoom_stats(self, summary): + self.zoom_stats_data["summary"] = summary if isinstance(summary, dict) else {} + return self.zoom_stats_data["summary"] + + def _get_report_device_data(self, source_data=None): + if source_data is not None: + if isinstance(source_data, dict): + return source_data + if isinstance(source_data, list): + return self.summarize_audio_video(source_data) if source_data else {} + return {} + + summary_data = self._get_summary_zoom_stats() + if summary_data: + return summary_data + + raw_qos_data = self._get_raw_zoom_stats() + if raw_qos_data: + return self.summarize_audio_video(raw_qos_data) + + return {} + + def _match_summary_data_to_hostnames(self, summary, host_key=None): + if not summary or not self.real_sta_hostname: + return summary + + normalized_summary = {} + used_source_keys = set() + target_host_key = self.real_sta_hostname[0] + + if host_key in summary: + host_stats = dict(summary[host_key]) + host_stats["is_host"] = True + normalized_summary[target_host_key] = host_stats + used_source_keys.add(host_key) + + remaining_source_keys = [ + key for key in summary.keys() if key not in used_source_keys + ] + remaining_target_keys = [ + hostname + for hostname in self.real_sta_hostname + if hostname not in normalized_summary + ] + + for hostname in list(remaining_target_keys): + cleaned_hostname = self._clean_zoom_participant_name(hostname) + matched_source_key = next( + ( + source_key + for source_key in remaining_source_keys + if self._clean_zoom_participant_name(source_key) + and cleaned_hostname + and self._clean_zoom_participant_name(source_key) + == cleaned_hostname + ), + None, + ) + if matched_source_key is None: + continue + + normalized_summary[hostname] = dict(summary[matched_source_key]) + used_source_keys.add(matched_source_key) + remaining_source_keys.remove(matched_source_key) + + for source_key, stats in summary.items(): + if source_key not in used_source_keys: + normalized_summary[source_key] = dict(stats) + self._set_summary_zoom_stats(normalized_summary) + if self.do_robo: + self.save_json( + self._get_summary_zoom_stats(), + f"{self.remote_login_url}_{self.current_cord}_{self.current_angle}_qos.json", + ) + self.save_json( + self._get_raw_zoom_stats(), + f"{self.remote_login_url}_{self.current_cord}_{self.current_angle}_raw_qos.json", + ) + else: + self.save_json( + self._get_summary_zoom_stats(), f"{self.remote_login_url}_qos.json" + ) + self.save_json( + self._get_raw_zoom_stats(), f"{self.remote_login_url}_raw_qos.json" + ) + return normalized_summary + + def summarize_csv_audio_video(self, csv_path): + # Step 1: Find the correct header line + with open(csv_path, "r", encoding="utf-8-sig") as f: + lines = f.readlines() + + meeting_summary = pd.read_csv(csv_path, nrows=1, encoding="utf-8-sig") + csv_host_name = None + if not meeting_summary.empty and "Host" in meeting_summary.columns: + host_value = meeting_summary.iloc[0].get("Host") + if pd.notna(host_value): + csv_host_name = self._clean_zoom_participant_name(host_value) + + # Step 2: Find the line index where real participant data header starts + header_line_idx = None + for i, line in enumerate(lines): + if line.strip().startswith("Participant,"): + header_line_idx = i + break + + if header_line_idx is None: + raise ValueError( + "Could not find the participant metrics section in the CSV." + ) + + # Step 3: Read only the participant section + df = pd.read_csv(csv_path, skiprows=header_line_idx, encoding="utf-8-sig") + df.columns = df.columns.str.strip() + + # Mapping from JSON-style keys to CSV columns + metric_map = { + # Audio + "audio_output_bitrate_avg": "Audio (Sending) Bitrate", + "audio_input_bitrate_avg": "Audio (Receiving) Bitrate", + "audio_output_latency_avg": "Audio (Sending) Latency-Avg/Max", + "audio_input_latency_avg": "Audio (Receiving) Latency-Avg/Max", + "audio_output_jitter_avg": "Audio (Sending) Jitter-Avg/Max", + "audio_input_jitter_avg": "Audio (Receiving) Jitter-Avg/Max", + "audio_output_avg_loss_avg": "Audio (Sending) Packet Loss-Avg/Max", + "audio_input_avg_loss_avg": "Audio (Receiving) Packet Loss-Avg/Max", + "audio_mos_avg": "Audio Quality", + # Video + "video_output_bitrate_avg": "Video (Sending) Bitrate", + "video_input_bitrate_avg": "Video (Receiving) Bitrate", + "video_output_latency_avg": "Video (Sending) Latency-Avg/Max", + "video_input_latency_avg": "Video (Receiving) Latency-Avg/Max", + "video_output_jitter_avg": "Video (Sending) Jitter-Avg/Max", + "video_input_jitter_avg": "Video (Receiving) Jitter-Avg/Max", + "video_output_avg_loss_avg": "Video (Sending) Packet Loss-Avg/Max", + "video_input_avg_loss_avg": "Video (Receiving) Packet Loss-Avg/Max", + "video_output_frame_rate_avg": "Video (Sending) Frame Rate", + "video_input_frame_rate_avg": "Video (Receiving) Frame Rate", + "video_mos_avg": "Video Quality", + } + + summary = {} + host_device_key = None + + for index, row in df.iterrows(): + participant_value = row.get("Participant") + if pd.isna(participant_value): + continue + + device = self._clean_zoom_participant_name(participant_value) + if not device: + continue + + summary[device] = {key: None for key in metric_map} + summary[device]["is_host"] = False + + for metric_key, csv_column in metric_map.items(): + raw_value = row.get(csv_column) + parsed_value = self.parse_zoom_value(raw_value) + if isinstance(parsed_value, float): + summary[device][metric_key] = round(parsed_value, 2) + else: + summary[device][metric_key] = parsed_value + + if ( + csv_host_name + and self._clean_zoom_participant_name(device) + and self._clean_zoom_participant_name(device) == csv_host_name + ): + summary[device]["is_host"] = True + host_device_key = device + elif index == 0 and host_device_key is None: + host_device_key = device + + if not summary: + return summary + + if host_device_key not in summary: + host_device_key = next(iter(summary)) + + summary[host_device_key]["is_host"] = True + return self._match_summary_data_to_hostnames(summary, host_device_key) + + def summarize_audio_video(self, json_data): + """ + Summarize per-device audio and video stats: avg/max of bitrate, jitter, latency, packet loss. + + Args: + json_data (list): Zoom JSON as list of participants. + + Returns: + dict: {device_name: {metric_field_avg/max: value, ...}} + """ + if not json_data: + summary_data = self._get_summary_zoom_stats() + return summary_data if summary_data else {} + + metrics = ["audio_input", "audio_output", "video_input", "video_output"] + fields = ["bitrate", "latency", "jitter", "avg_loss", "frame_rate"] + + summary = {} + count = 0 + host_device_key = None + for _index, participant in enumerate(json_data): + participant_name = participant.get( + "user_name" + ) or "Unknown Device {count}".format(count=count + 1) + device = self._clean_zoom_participant_name(participant_name) + if device not in summary: + summary[device] = { + f"{m}_{f}_avg": None for m in metrics for f in fields + } + summary[device].update( + {"is_host": participant.get("is_original_host", False)} + ) + if participant.get("is_original_host", False): + host_device_key = device + + temp_values = {m: {f: [] for f in fields} for m in metrics} + + for sample in participant.get("user_qos", []): + for m in metrics: + data = sample.get(m, {}) + for f in fields: + val = self.parse_value(data.get(f)) + if val is not None: + temp_values[m][f].append(val) + + # calculate avg and max + for m in metrics: + for f in fields: + vals = temp_values[m][f] + if vals: + summary[device][f"{m}_{f}_avg"] = round( + sum(vals) / len(vals), 2 + ) + + if summary and host_device_key not in summary: + host_device_key = next(iter(summary)) + summary[host_device_key]["is_host"] = True + + return self._match_summary_data_to_hostnames(summary, host_device_key) + def check_tab_exists(self): """ Checks if the 'generic' tab exists by making a JSON GET request. @@ -1410,87 +2725,1602 @@ def filter_ios_devices(self, device_list): self.device_list = filtered_list return filtered_list + def add_bandsteering_report_section(self, report=None): + try: -def main(): - try: - parser = argparse.ArgumentParser( - prog=__file__, - formatter_class=argparse.RawTextHelpFormatter, - description=textwrap.dedent(''' - Zoom Automation Script + """ + Bandsteering reporting (Robo-style): + Reads all zoom stats CSVs from report directory (self.path) and builds: + - BSSID change count graph per device + - Table of BSSID change events + """ + if report is None: + logger.error("Bandsteering report: report object is None") + return + + report_dir = self.path + + if not report_dir or not os.path.isdir(report_dir): + logger.error(f"Bandsteering report: invalid report dir: {report_dir}") + return + + logger.info(f"Bandsteering report dir: {report_dir}") + + # Search for CSV files in self.path + csv_files = glob.glob(os.path.join(report_dir, "*.csv")) + logger.info(f"Bandsteering CSV files found: {csv_files}") + + if not csv_files: + logger.warning("No CSVs found in report dir for bandsteering") + return + + report.set_obj_html( + _obj_title="Band Steering Statistics", + _obj="This section summarizes BSSID changes observed while the robot moved between coordinates.", + ) + report.build_objective() + + allowed_bssids = set(self.bssids) if self.bssids else set() + + for csv_file_path in csv_files: + try: + df = pd.read_csv(csv_file_path) + except Exception as e: + logger.error( + f"Unable to read CSV {csv_file_path}: {e}", exc_info=True + ) + continue + + # Rename columns to match the specific capitalization expected by this logic + df.rename( + columns={ + "timestamp": "TimeStamp", + "bssid": "BSSID", + "channel": "Channel", + }, + inplace=True, + ) + + required_cols = { + "TimeStamp", + "BSSID", + "From_Coord", + "To_Coord", + "Channel", + } + + # Check if this CSV actually contains bandsteering data (skip summary/other CSVs) + if not required_cols.issubset(df.columns): + continue + + device_name = os.path.basename(csv_file_path).replace(".csv", "") + + # Clean columns + df["BSSID"] = df["BSSID"].fillna("NA").astype(str) + df["TimeStamp"] = df["TimeStamp"].fillna("NA").astype(str) + df["From_Coord"] = df["From_Coord"].fillna("NA").astype(str) + df["To_Coord"] = df["To_Coord"].fillna("NA").astype(str) + df["Channel"] = df["Channel"].fillna("NA").astype(str) + + # Filter only configured BSSIDs (if provided) + if allowed_bssids: + df = df[df["BSSID"].isin(allowed_bssids)] + + if df.empty: + logging.info(f"No matching BSSID rows for {device_name}") + + # Detect change points + df["prev_bssid"] = df["BSSID"].shift() + + mask = ( + (df["BSSID"] != df["prev_bssid"]) + & (df["BSSID"] != "NA") + & (df["prev_bssid"] != "NA") + & (df["prev_bssid"].notnull()) + ) + + bssid_list = df.loc[mask, "BSSID"].tolist() + timestamp_list = df.loc[mask, "TimeStamp"].tolist() + from_coordinate_list = df.loc[mask, "From_Coord"].tolist() + to_coordinate_list = df.loc[mask, "To_Coord"].tolist() + channel_list = df.loc[mask, "Channel"].tolist() + + skip_table = not mask.any() + + # Count BSSID switches + if skip_table: + # Ensure all expected BSSIDs show zero + bssid_counts = {bssid: 0 for bssid in self.bssids} + else: + bssid_counts = Counter(bssid_list) + + # Ensure consistent graph ordering + if self.bssids: + final_bssid_counts = { + bssid: bssid_counts.get(bssid, 0) for bssid in self.bssids + } + else: + final_bssid_counts = bssid_counts + + x_axis = list(final_bssid_counts.keys()) + y_axis = [[float(v)] for v in final_bssid_counts.values()] + + report.set_obj_html( + _obj_title=f"BSSID Change Count Of The Client {device_name}", + _obj=" ", + ) + report.build_objective() + + graph = lf_bar_graph( + _data_set=y_axis, + _xaxis_name="BSSID", + _yaxis_name="Number of Changes", + _xaxis_categories=[""], + _xaxis_label=x_axis, + _graph_image_name=f"zoom_bssid_change_count_{device_name}", + _label=x_axis, + _xaxis_step=1, + _graph_title=f"Zoom Bandsteering: BSSID change count for device : {device_name}", + _title_size=16, + _bar_width=0.15, + _figsize=(18, 6), + _dpi=96, + _show_bar_value=True, + _enable_csv=True, + ) + + graph_png = graph.build_bar_graph() + report.set_graph_image(graph_png) + report.move_graph_image() + report.set_csv_filename(graph_png) + report.move_csv_file() + report.build_graph() + + if skip_table: + report.set_obj_html( + _obj_title=f"Band Steering Results for {device_name}", + _obj="No band steering events observed for the configured BSSID list.", + ) + report.build_objective() + continue + + report.set_obj_html( + _obj_title=f"Band Steering Results for {device_name}", _obj=" " + ) + report.build_objective() + + table_df = pd.DataFrame( + { + "TimeStamp": timestamp_list, + "BSSID": bssid_list, + "Channel": channel_list, + "From Coordinate": from_coordinate_list, + "To Coordinate": to_coordinate_list, + } + ) + + report.set_table_dataframe(table_df) + report.build_table() + + # Handle Charging Timestamps (Check if robo_obj exists first) + if ( + hasattr(self, "robo_obj") + and hasattr(self.robo_obj, "charging_timestamps") + and len(self.robo_obj.charging_timestamps) != 0 + ): + report.set_obj_html(_obj_title="Charging Timestamps", _obj="") + report.build_objective() + df = pd.DataFrame( + self.robo_obj.charging_timestamps, + columns=[ + "charge_dock_arrival_timestamp", + "charging_completion_timestamp", + ], + ) + # Add S.No column + df.insert(0, "S.No", range(1, len(df) + 1)) + report.set_table_dataframe(df) + report.build_table() + else: + report.set_obj_html( + _obj_title="Charging Timestamps", + _obj="Robot did not go to charge during this test", + ) + report.build_objective() + except Exception as e: + logger.error(f"Exeception Occured {e}") + logger.error("Error Occured ", exc_info=True) + + def add_live_view_images_to_report(self): + """ + Waits for and adds the Video and Audio heatmap images for Floor 1. + """ + live_view_dir = os.path.join(self.path, "live_view_images") + + # Define the specific filenames for Floor 1 + video_img_name = f"zoom_video_{self.testname}_floor1.png" + audio_img_name = f"zoom_audio_{self.testname}_floor1.png" + + video_path = os.path.join(live_view_dir, video_img_name) + audio_path = os.path.join(live_view_dir, audio_img_name) + + timeout = 90 # seconds + start_time = time.time() + + # 1. Wait for the Video image (Primary trigger) + while not (os.path.exists(video_path) and os.path.exists(audio_path)): + if time.time() - start_time > timeout: + logger.error(f"Timeout: {video_img_name} not found within 60 seconds.") + break + time.sleep(1) + + if os.path.exists(video_path): + logger.info(f"Found video heatmap image: {video_path}") + else: + logger.warning(f"Video heatmap image not found: {video_path}") + + if os.path.exists(audio_path): + logger.info(f"Found audio heatmap image: {audio_path}") + else: + logger.warning(f"Audio heatmap image not found: {audio_path}") + + # 2. Build the HTML Report Content + html_content = "" + + # Add Video Map (if found) + if os.path.exists(video_path): + html_content += ( + '
' + '