Andrii Dokhniak cab43f63ce mitmproxy
2025-08-30 18:27:49 +02:00

82 lines
2.6 KiB
Python

from typing import Optional
import mitmproxy.addons.savehar
from mitmproxy import http
from mitmproxy.types import Path
from mitmproxy.addonmanager import Loader
import threading
import time
import json
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
class SaveHar2:
def __init__(self) -> None:
self.save_har = mitmproxy.addons.savehar.SaveHar()
self.counter = 0
server_address = ('', 9111)
parr = self
class HTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/get_har':
print(parr.save_har.flows);
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.end_headers()
har = parr.get_har_json()
self.wfile.write(har)
if self.path == '/get_stats':
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.end_headers()
har = parr.get_har_json()
self.wfile.write(("{\"connections\": " + str(len(parr.save_har.flows)) +" }\n").encode("utf8"))
self.httpd = HTTPServer(server_address, HTTPRequestHandler)
print("Server started ...")
self.httpd_thread = threading.Thread(target=self.httpd.serve_forever)
self.httpd_thread.start()
def save_har_to_file(self):
self.save_har.export_har(self.save_har.flows, Path("output.har"))
def get_har_json(self) -> bytes:
return json.dumps(self.save_har.make_har(self.save_har.flows), indent=4).encode()
def configure(self, updated):
self.save_har.configure(updated);
def _save_flow(self, flow: http.HTTPFlow) -> None:
self.save_har.flows.append(flow)
def response(self, flow: http.HTTPFlow) -> None:
# websocket flows will receive a websocket_end,
# we don't want to persist them here already
if flow.websocket is None:
self._save_flow(flow)
if flow.websocket is None:
self._save_flow(flow)
self.counter += 1
if (self.counter == 5):
print("Outputing har")
print(self.save_har.flows)
self.save_har_to_file()
self.counter = 0
def error(self, flow: http.HTTPFlow) -> None:
self.response(flow)
def websocket_end(self, flow: http.HTTPFlow) -> None:
self._save_flow(flow)
addons = [SaveHar2()]