This guide provides comprehensive information for developers who want to understand, modify, or extend the General Communications system.
The system uses a thread-per-client model:
- Server: Main thread accepts connections, spawns worker threads per client
- Client: Separate threads for heartbeat, message receiving, and user input
- Synchronization: Minimal shared state, mostly message passing
- Custom TCP protocol with multiple specialized ports
- Each port serves a specific communication purpose
- Stateful connections with proper cleanup handling
- Clear separation between server, client, and shared components
- Plugin system through
mods/directories - C library integration for performance-critical operations
src/
├── Server/ # Server-side components
│ ├── server.py # Main server application
│ ├── db.py # Database operations
│ ├── c16.py # C library interface
│ ├── 16.c # C encoding library
│ └── mods/ # Server plugins
├── Client/ # Client-side components
│ ├── comms.py # Client application
│ ├── db.py # Client database operations
│ └── mods/ # Client plugins
└── Components/ # Shared components
└── c16 components/ # Compiled libraries
- Python 3.6+ with development headers
- C compiler (gcc, clang, or MSVC)
- Git for version control
- Text editor or IDE with Python support
pip install prompt_toolkit pytest black flake8 mypy# Development build with debug symbols
cd src/Server
gcc -shared -fPIC -g -O0 -o lib16.so 16.c
# Production build
gcc -shared -fPIC -O2 -o lib16.so 16.cdef acception():
# Main server loop
# 1. Accept client connections
# 2. Spawn authentication thread
# 3. Add client to active list
# 4. Handle graceful shutdown- Connection: Client connects to port 10740
- Authentication: Multi-stage auth on ports 9281/12090
- Active State: Heartbeat monitoring and message handling
- Cleanup: Automatic removal on disconnect/timeout
- Main Thread: Connection acceptance
- Auth Threads: One per connecting client
- Heartbeat Threads: One per authenticated client
- Message Threads: One listener per client
def main():
# 1. Initial connection and ping
# 2. Authentication handshake
# 3. Start background threads
# 4. Enter message input loop- Main Thread: User input and message sending
- Heartbeat Thread: Respond to server health checks
- Message Thread: Receive and display incoming messages
{id};{timestamp};{username};{message_content}
- Storage: Append-only message logging
- Retrieval: ID-based and time-based queries
- Parsing: Extract components from message records
- Validation: Type checking and format verification
// Character to two-character encoding
char* encode_B(char* in) {
// Split byte into nibbles
// Add 'A' offset for ASCII representation
// Return allocated string
}# Load platform-appropriate library
c16 = ctypes.CDLL("./lib16.so") # Linux
c16 = ctypes.CDLL("./16.dll") # Windows
# Configure return types
c16.encode_B.restype = ctypes.c_char_p
c16.decode_B.restype = ctypes.c_char_p# Define new port and message format
NEW_FEATURE_PORT = 11000
# Update server.py
def handle_new_feature(client):
# Implementation here
pass
# Update comms.py
def use_new_feature():
# Client-side implementation
pass# Update db.py parsing functions
def fetch_new_field(message):
# Extract new field from message
pass
# Update message format
def add_message(time, user, message, new_field):
# Include new field in storage
pass// Add new function to 16.c
char* new_encoding_function(char* input) {
// Implementation
return result;
}# Update c16.py interface
c16.new_encoding_function.restype = ctypes.c_char_p# test/test_db.py
import unittest
from src.Server import db
class TestDatabase(unittest.TestCase):
def test_message_validation(self):
valid_msg = "1;1234567890.123;user;Hello"
self.assertTrue(db.validate_message(valid_msg))
def test_message_parsing(self):
msg = "1;1234567890.123;user;Hello"
self.assertEqual(db.fetch_user(msg), "user")# test/test_integration.py
def test_client_server_communication():
# Start server in background
# Connect client
# Send test message
# Verify message received
pass# test/test_load.py
def test_multiple_clients():
# Spawn multiple client threads
# Send concurrent messages
# Verify all messages received
# Check server stability
pass# Enable debug mode in server.py
debug = 1
# Add logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.debug(f"Client {client} connected")# Add debug prints in comms.py
print(f"Sending message: {message}")
print(f"Received: {messages}")# Monitor network traffic
sudo tcpdump -i lo port 10740
# Check port usage
netstat -tulpn | grep python# mods/example_plugin.py
def on_client_connect(client_ip):
"""Called when client connects"""
print(f"Plugin: Client {client_ip} connected")
def on_message_received(client_ip, message):
"""Called when message received"""
# Process message
return message # Return modified message or None to block
def on_client_disconnect(client_ip):
"""Called when client disconnects"""
print(f"Plugin: Client {client_ip} disconnected")# server.py automatically loads plugins
def load_mods():
for file in os.listdir("mods"):
if file.endswith(".py") and not file.startswith("__"):
module_name = file[:-3]
importlib.import_module(f"mods.{module_name}")# mods/client_plugin.py
def on_message_display(username, message):
"""Called before displaying message"""
# Modify display format
return f"[{username}]: {message}"
def on_message_send(message):
"""Called before sending message"""
# Modify outgoing message
return message.upper() # Example: convert to uppercase# Increase socket backlog for high concurrency
server_socket.listen(100) # Default is 5
# Use connection pooling
connection_pool = []# Limit message history
MAX_MESSAGES = 10000
def cleanup_old_messages():
# Remove messages older than threshold
pass# Use thread pools instead of unlimited threads
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=50)# Cache recent messages for faster display
message_cache = []
def cache_message(message):
message_cache.append(message)
if len(message_cache) > 100:
message_cache.pop(0)# Implement connection retry logic
def connect_with_retry(host, port, max_retries=3):
for attempt in range(max_retries):
try:
return socket.connect((host, port))
except ConnectionRefusedError:
time.sleep(2 ** attempt) # Exponential backoff
raise ConnectionError("Max retries exceeded")def sanitize_message(message):
# Remove dangerous characters
dangerous_chars = ['<', '>', '&', '"', "'"]
for char in dangerous_chars:
message = message.replace(char, '')
return message# Implement proper authentication
import hashlib
import secrets
def generate_auth_token():
return secrets.token_hex(32)
def hash_password(password, salt):
return hashlib.pbkdf2_hmac('sha256',
password.encode(),
salt.encode(),
100000)# Add SSL/TLS support
import ssl
def create_secure_socket():
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
secure_sock = context.wrap_socket(sock)
return secure_sock# Use descriptive variable names
client_socket = socket.socket() # Good
s = socket.socket() # Bad
# Function documentation
def authenticate_client(client_ip: str) -> bool:
"""
Authenticate a client connection.
Args:
client_ip: IP address of the client
Returns:
True if authentication successful, False otherwise
"""
pass// Use consistent indentation
char* encode_character(char* input) {
char* output = malloc(3);
if (output == NULL) {
return NULL; // Handle allocation failure
}
// Implementation here
return output;
}# Always handle exceptions appropriately
try:
client_socket.connect((host, port))
except ConnectionRefusedError:
logging.error(f"Cannot connect to {host}:{port}")
return False
except Exception as e:
logging.error(f"Unexpected error: {e}")
return False# Use proper logging instead of print statements
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('server.log'),
logging.StreamHandler()
]
)# Use configuration files
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
HOST = config.get('server', 'host', fallback='127.0.0.1')
PORT = config.getint('server', 'port', fallback=10740)# Add metrics collection
import time
class Metrics:
def __init__(self):
self.connections = 0
self.messages_sent = 0
self.start_time = time.time()
def uptime(self):
return time.time() - self.start_time- Create feature branch from main
- Implement changes with tests
- Run linting and type checking
- Submit pull request with description
- Address review feedback
- Merge after approval
- Update API documentation for new functions
- Add examples for new features
- Update installation guide if dependencies change
- Include troubleshooting information
- Unit tests for all new functions
- Integration tests for protocol changes
- Load testing for performance improvements
- Regression testing for bug fixes
This development guide provides the foundation for understanding and extending the General Communications system. For specific implementation details, refer to the source code and API documentation.