82 lines
2.6 KiB
Python
82 lines
2.6 KiB
Python
from typing import Optional
|
|
import mitmproxy.addons.savehar
|
|
from mitmproxy import http
|
|
from mitmproxy.types import Path
|
|
from mitmproxy.addonmanager import Loader
|
|
import threading
|
|
import time
|
|
import json
|
|
from http import HTTPStatus
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
class SaveHar2:
|
|
|
|
def __init__(self) -> None:
|
|
self.save_har = mitmproxy.addons.savehar.SaveHar()
|
|
self.counter = 0
|
|
|
|
server_address = ('', 9111)
|
|
parr = self
|
|
|
|
class HTTPRequestHandler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
|
|
if self.path == '/get_har':
|
|
print(parr.save_har.flows);
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
har = parr.get_har_json()
|
|
self.wfile.write(har)
|
|
|
|
if self.path == '/get_stats':
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
har = parr.get_har_json()
|
|
self.wfile.write(("{\"connections\": " + str(len(parr.save_har.flows)) +" }\n").encode("utf8"))
|
|
|
|
self.httpd = HTTPServer(server_address, HTTPRequestHandler)
|
|
print("Server started ...")
|
|
|
|
self.httpd_thread = threading.Thread(target=self.httpd.serve_forever)
|
|
self.httpd_thread.start()
|
|
|
|
def save_har_to_file(self):
|
|
self.save_har.export_har(self.save_har.flows, Path("output.har"))
|
|
|
|
def get_har_json(self) -> bytes:
|
|
return json.dumps(self.save_har.make_har(self.save_har.flows), indent=4).encode()
|
|
|
|
def configure(self, updated):
|
|
self.save_har.configure(updated);
|
|
|
|
def _save_flow(self, flow: http.HTTPFlow) -> None:
|
|
self.save_har.flows.append(flow)
|
|
|
|
def response(self, flow: http.HTTPFlow) -> None:
|
|
# websocket flows will receive a websocket_end,
|
|
# we don't want to persist them here already
|
|
if flow.websocket is None:
|
|
self._save_flow(flow)
|
|
|
|
if flow.websocket is None:
|
|
self._save_flow(flow)
|
|
|
|
self.counter += 1
|
|
if (self.counter == 5):
|
|
print("Outputing har")
|
|
print(self.save_har.flows)
|
|
self.save_har_to_file()
|
|
self.counter = 0
|
|
|
|
|
|
def error(self, flow: http.HTTPFlow) -> None:
|
|
self.response(flow)
|
|
|
|
def websocket_end(self, flow: http.HTTPFlow) -> None:
|
|
self._save_flow(flow)
|
|
|
|
addons = [SaveHar2()]
|