# v2 from __future__ import annotations import pickle from pathlib import Path def load_domain_list( path: Path, ) -> set[str]: domains: set[str] = set() if not path.exists(): return domains for raw in path.read_text( encoding="utf-8", errors="ignore", ).splitlines(): line = raw.strip() if ( not line or line.startswith("#") or line.startswith("!") or line.startswith("//") ): continue if line.startswith("||"): line = line[2:] if line.startswith( "0.0.0.0 " ): line = line.split( None, 1, )[1] elif line.startswith( "127.0.0.1 " ): line = line.split( None, 1, )[1] line = ( line.replace("^", "") .replace("/", "") .strip() .lower() ) if ( "." not in line or " " in line ): continue domains.add( line ) return domains def save_cache( cache_file: Path, domains: set[str], ) -> None: cache_file.parent.mkdir( parents=True, exist_ok=True, ) with cache_file.open( "wb" ) as fh: pickle.dump( domains, fh, protocol=pickle.HIGHEST_PROTOCOL, ) def load_cache( cache_file: Path, ) -> set[str]: if not cache_file.exists(): return set() try: with cache_file.open( "rb" ) as fh: data = pickle.load( fh ) if isinstance( data, set, ): return { str(domain) .lower() .strip() for domain in data if domain } except Exception: pass return set() class BlocklistEngine: def __init__( self, blocklists: list[Path], cache_file: Path, ): self.blocklists = ( blocklists ) self.cache_file = ( cache_file ) self.domains: set[str] = ( set() ) def load( self, rebuild: bool = False, ) -> None: if ( not rebuild and self.cache_file.exists() ): cached = load_cache( self.cache_file ) if cached: self.domains = cached return domains: set[str] = set() for path in ( self.blocklists ): domains.update( load_domain_list( path ) ) self.domains = domains save_cache( self.cache_file, domains, ) def contains( self, host: str, ) -> bool: host = ( host.lower() .strip() ) if not host: return False parts = host.split( "." ) for index in range( len(parts) ): candidate = ".".join( parts[index:] ) if ( candidate in self.domains ): return True return False def count( self, ) -> int: return len( self.domains ) def reload( self, ) -> None: self.load( rebuild=True ) def empty( self, ) -> bool: return not self.domains