forked from browser-use/web-ui
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter_mitm_logs.py
115 lines (92 loc) · 3.92 KB
/
filter_mitm_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import json
import logging
import os
from datetime import datetime
from urllib.parse import urlparse
# INPUT_FILE = "mitmproxy_endpoint_log.jsonl"
# OUTPUT_FILE = "filtered_mitmproxy_endpoint_log.jsonl"
logging.basicConfig(level=logging.INFO)
def extract_domain(url):
try:
parsed_url = urlparse(url)
return parsed_url.netloc
except Exception as e:
logging.error(f"Error extracting domain: {e}")
return None
def convert_date_to_unix(date_str):
try:
dt = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %Z")
unix_timestamp = dt.timestamp()
return int(unix_timestamp)
except ValueError as e:
print(f"Error converting date: {e}")
return None
def clean_malformed_json(line):
# Basic attempts to clean common issues
try:
# Fix unmatched brackets by trimming
if line.count('{') > line.count('}'):
line += '}'
elif line.count('}') > line.count('{'):
line = line.rstrip('}')
# Remove trailing commas
line = line.replace(",}", "}")
line = line.replace(",]", "]")
# Fix incomplete strings
line = line.replace('":,', '":null,') # Handle cases where value is missing after a key
except Exception as e:
print(f"Error during cleaning: {e}")
return line
def filter_jsonl_file(input_file, output_file, url):
try:
if url:
os.environ["SCOPE_URL"] = url
else:
url = os.environ.get("SCOPE_URL")
dom = extract_domain(url)
logging.info(f"Dom extracted from file-{dom}")
if not dom:
return
os.makedirs(os.path.dirname(output_file), exist_ok=True) #Ensure output file exists
line_number = 1
with open(input_file, "r", encoding="utf-8") as infile, open(output_file, "w", encoding="utf-8") as outfile:
for line in infile:
try:
try:
record = json.loads(line)
except json.JSONDecodeError:
logging.error(f"Skipping line {line_number}: Invalid JSON format. Retrying")
try:
cleaned_line = clean_malformed_json(line)
record = json.loads(cleaned_line)
except json.JSONDecodeError:
logging.error(f"Skipping line {line_number}: Invalid JSON format after cleaning. Skipping.")
line_number += 1
continue
if not record.get("url"):
line_number += 1
continue
if "url" in record and dom == extract_domain(record["url"]):
outfile.write(json.dumps(record) + "\n")
line_number += 1
continue
if record.get("event") == "response" and "headers" in record:
headers = record["headers"]
if "Referer" in headers and dom in headers["Referer"]:
outfile.write(json.dumps(record) + "\n")
elif "Origin" in headers and dom in headers["Origin"]:
outfile.write(json.dumps(record) + "\n")
elif "Host" in headers and dom in headers["Host"]:
outfile.write(json.dumps(record) + "\n")
except KeyError as e:
logging.error(f"Skipping line: {e}")
logging.info(f"Filtered output written to: {output_file}")
except FileNotFoundError:
logging.error(f"Error: The file {input_file} was not found.")
return None
except IOError as e:
logging.error(f"Error reading/writing files: {e}")
return None
if __name__ == "__main__":
URL = "https://shinobi.security"
# filter_jsonl_file(INPUT_FILE, OUTPUT_FILE, URL)