-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
95 lines (79 loc) · 3.18 KB
/
lambda_function.py
File metadata and controls
95 lines (79 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
import boto3
import gzip
import io
import os
#List of suspicious events
# This list can be extended with more events as needed
# For example, you might want to add, "CreateAccessKey", etc.
# depending on your security requirements.
sus_events = ["CreateUser", "DeleteUser", "AttachUserPolicy","StopLogging","PutBucketPolicy"]
sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
def lambda_handler(event, context):
s3 = boto3.client('s3')
sns = boto3.client('sns')
#Debugging
if 'Records' not in event:
print("No Records found in the event. Check your trigger source.")
return {
'statusCode': 400,
'body': json.dumps('Invalid event format: Missing Records.')
}
#Extract bucket name and object key from the s3 event trigger
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']
print(f"New file detected: s3://{bucket_name}/{object_key}")
#get the log file from s3
response = s3.get_object(Bucket=bucket_name, Key=object_key)
# Decompress GZIP file properly
compressed_body = response['Body'].read()
with gzip.GzipFile(fileobj=io.BytesIO(compressed_body)) as gzipfile:
decompressed_data = gzipfile.read()
# Check if the file is empty
if not decompressed_data:
print("Log file is empty. Skipping...")
return {
'statusCode': 200,
'body': json.dumps('Empty log file. Skipped.')
}
try:
log_data = json.loads(decompressed_data.decode('utf-8'))
except json.JSONDecodeError as e:
print(f"JSON Decode Error: {str(e)}")
return {
'statusCode': 400,
'body': json.dumps('Invalid JSON format.')
}
sus_activities = []
sent = False
for record in log_data.get('Records', []):
event_name = record.get('eventName')
user = record.get('userIdentity', {}).get('userName', 'Unknown User')
if event_name in sus_events:
sus_activities.append({
'eventName': event_name,
'user': user,
'eventTime': record.get('eventTime')
})
print(f"Suspicious activities detected")
alert_message = "Suspicious activities detected:\n"
for activity in sus_activities:
detail = f"User: {activity['user']}, Event: {activity['eventName']}, Time: {activity['eventTime']}"
print(detail)
alert_message = detail
if not sent:
print("Sending alert via SNS")
#send sns alert
sns.publish(
TopicArn=sns_topic_arn,
Subject='AWS Suspicious activity detected',
Message=alert_message
#Message=json.dumps({'default': alert_message})
)
sent = True
else:
print(f"not suspicious")
return {
'statusCode': 200,
'body': json.dumps("scan complete")
}