-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.py
More file actions
98 lines (86 loc) · 2.86 KB
/
config.py
File metadata and controls
98 lines (86 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# config.py
import yaml
import os
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class SplunkConfig:
host: str
port: int
token: str
index: str
@dataclass
class SlackConfig:
webhook_url: str
channel: str
@dataclass
class GCPConfig:
credentials_path: Optional[str]
projects: List[str]
class ConfigManager:
"""Manage Path Predict configuration"""
def __init__(self, config_path: str = "config.yaml"):
self.config_path = config_path
self.config = self.load_config()
def load_config(self) -> Dict[str, Any]:
"""Load configuration from YAML file"""
if not os.path.exists(self.config_path):
return self.default_config()
with open(self.config_path, 'r') as f:
return yaml.safe_load(f)
def default_config(self) -> Dict[str, Any]:
"""Return default configuration"""
return {
'integrations': {
'splunk': {
'enabled': False,
'host': 'localhost',
'port': 8088,
'token': '',
'index': 'main'
},
'slack': {
'enabled': False,
'webhook_url': '',
'channel': '#security-alerts'
}
},
'gcp': {
'credentials_path': None,
'projects': []
},
'alerting': {
'min_severity': 'HIGH'
}
}
def get_splunk_config(self) -> Optional[SplunkConfig]:
"""Get Splunk configuration"""
splunk_cfg = self.config.get('integrations', {}).get('splunk', {})
if splunk_cfg.get('enabled'):
return SplunkConfig(
host=splunk_cfg.get('host'),
port=splunk_cfg.get('port'),
token=splunk_cfg.get('token'),
index=splunk_cfg.get('index')
)
return None
def get_slack_config(self) -> Optional[SlackConfig]:
"""Get Slack configuration"""
slack_cfg = self.config.get('integrations', {}).get('slack', {})
if slack_cfg.get('enabled'):
return SlackConfig(
webhook_url=slack_cfg.get('webhook_url'),
channel=slack_cfg.get('channel', '#security-alerts')
)
return None
def get_gcp_config(self) -> GCPConfig:
"""Get GCP configuration"""
gcp_cfg = self.config.get('gcp', {})
return GCPConfig(
credentials_path=gcp_cfg.get('credentials_path'),
projects=gcp_cfg.get('projects', [])
)
def save_config(self):
"""Save configuration to file"""
with open(self.config_path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False)