141 lines
3.8 KiB
Python
141 lines
3.8 KiB
Python
# ruff: noqa: D100, D101, D102, D103, G004, TRY400, PTH123, RUF012
|
|
import argparse
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from math import ceil
|
|
from os import getenv
|
|
from time import perf_counter
|
|
|
|
|
|
# Configure logging with color formatting
|
|
class CustomFormatter(logging.Formatter):
|
|
GREY: str = "\x1b[38;20m"
|
|
YELLOW: str = "\x1b[33;20m"
|
|
RED: str = "\x1b[31;20m"
|
|
BOLD_RED: str = "\x1b[31;1m"
|
|
RESET: str = "\x1b[0m"
|
|
|
|
COLOR_MAP: dict[int, str] = {
|
|
logging.DEBUG: GREY,
|
|
logging.INFO: GREY,
|
|
logging.WARNING: YELLOW,
|
|
logging.ERROR: RED,
|
|
logging.CRITICAL: BOLD_RED,
|
|
}
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
color: str = self.COLOR_MAP.get(record.levelno, self.GREY)
|
|
return f"{color}[{record.levelname}] {record.getMessage()}{self.RESET}"
|
|
|
|
|
|
# Set up logging
|
|
logger: logging.Logger = logging.getLogger(__name__)
|
|
logger.setLevel(getenv("LOGLEVEL", "INFO").upper())
|
|
color_handler: logging.StreamHandler = logging.StreamHandler()
|
|
color_handler.setFormatter(CustomFormatter())
|
|
logger.addHandler(color_handler)
|
|
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
parser: argparse.ArgumentParser = argparse.ArgumentParser(
|
|
description="Convert our JSON blocking list to popular blocking list formats",
|
|
)
|
|
parser.add_argument(
|
|
"--inputfile",
|
|
required=True,
|
|
metavar="INPUT_FILE",
|
|
help="Path to the input JSON file containing the blocking list",
|
|
)
|
|
parser.add_argument(
|
|
"--targetformat",
|
|
required=True,
|
|
metavar="TARGET_FORMAT",
|
|
help="Target output format (e.g., adguard)",
|
|
)
|
|
parser.add_argument(
|
|
"--outputfile",
|
|
required=True,
|
|
metavar="OUTPUT_FILE",
|
|
help="Path to the output file",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_data(filename: str) -> dict:
|
|
with open(filename) as file:
|
|
return json.load(file)
|
|
|
|
|
|
class UnsupportedTargetFormatError(Exception):
|
|
pass
|
|
|
|
|
|
def convert(data: dict, target_format: str) -> str:
|
|
match target_format:
|
|
case "adguard":
|
|
return adguard_conversion(data)
|
|
case _:
|
|
raise UnsupportedTargetFormatError
|
|
|
|
|
|
def adguard_conversion(data: dict) -> list[str]:
|
|
header_lines: list[str] = [
|
|
f"! Blocking list automatically generated at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z%z')}",
|
|
"! Created with ❤️ by internet-czas-dzialac.pl",
|
|
]
|
|
|
|
output: list[str] = list(header_lines)
|
|
|
|
for entry in data["domains"]:
|
|
fqdn = entry["fqdn"]
|
|
if entry.get("exclude", False):
|
|
continue
|
|
output.append(f"||{fqdn}^")
|
|
|
|
return "\n".join(output)
|
|
|
|
|
|
def dump_output(data: str, output_file: str) -> None:
|
|
with open(output_file, "w") as file:
|
|
file.write(data)
|
|
|
|
|
|
def main() -> None:
|
|
# Start measuring time
|
|
start_time: float = perf_counter()
|
|
|
|
# Parse arguments
|
|
args: argparse.Namespace = parse_arguments()
|
|
|
|
# Load data
|
|
try:
|
|
data: dict = load_data(args.inputfile)
|
|
except FileNotFoundError:
|
|
logger.error(f"File {args.inputfile} not found!")
|
|
return
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON: {e}")
|
|
return
|
|
except Exception as e:
|
|
logger.critical(f"Unexpected error occurred: {e}")
|
|
return
|
|
|
|
# Convert
|
|
try:
|
|
output = convert(data, args.targetformat)
|
|
except UnsupportedTargetFormatError:
|
|
logger.error('Unsupported format. For now only "adguard" is supported.')
|
|
return
|
|
|
|
# Dump generated data
|
|
dump_output(output, args.outputfile)
|
|
|
|
# Result time print
|
|
delta: float = ceil((perf_counter() - start_time) * 1000)
|
|
logger.info(f"Generated in {delta} ms")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|