#!/usr/bin/env ruby # typed: true require "sorbet-runtime" require "set" require "tmpdir" extend T::Sig START_FILES = T.let( %w[ github google youtube twitter facebook discord reddit wikimedia stackexchange libgen python ruby creativecommons sci-hub v2ray imgur npmjs onedrive matrix ], T::Array[String] ) sig { returns(T::Array[String]) } def download_data() # Create a temp directory temp_dir = Dir.mktmpdir # download url to temp_dir zip_file = File.join(temp_dir, "domain-list-community-master.zip") # zip_file = "master.zip" url = "https://github.com/v2fly/domain-list-community/archive/refs/heads/master.zip" `curl -sfL '#{url}' -o #{zip_file}` `unzip #{zip_file} -d #{temp_dir}` data_dir = File.join(temp_dir, "domain-list-community-master", "data") raise "data dir not found" if not Dir.exist?(data_dir) [data_dir, temp_dir] end class Entry extend T::Sig sig { params(type: String, value: String, attributes: T::Array[String]).void } def initialize(type, value, attributes) @type = type @value = value @attributes = attributes end attr_reader :type, :value, :attributes def to_s "#{type},#{value}" end end # Return nil if the line is a comment or empty # Return a string if the line is an include # Return an Entry if the line is a rule sig { params(line: String).returns(T.nilable(T.any(Entry, String))) } def handle_line(line) line.strip! return if line.empty? return if line.start_with?("#") fields = T .must(line.split("#")[0]) .split(" ") .map { |s| s.strip } .filter { |s| not s.empty? } rule = T.let(T.must(fields[0]), String) attributes = T.let([], T::Array[String]) for attribute in T.must(fields[1..]) if attribute.start_with?("@") attributes << T.must(attribute[1..]) else raise "Invalid attribute: #{attribute}" end end type = T.let("", String) value = T.let("", String) if rule.start_with?("include:") return rule["include:".length..] elsif rule.start_with?("domain:") type = "DOMAIN-SUFFIX" value = T.must(rule["domain:".length..]) elsif rule.start_with?("full:") type = "DOMAIN" value = T.must(rule["full:".length..]) elsif rule.start_with?("keyword:") type = "DOMAIN-KEYWORD" value = T.must(rule["keyword:".length..]) elsif rule.start_with?("regexp:") type = "URL-REGEX" value = T.must(rule["regexp:".length..]) else type = "DOMAIN-SUFFIX" value = rule end Entry.new(type, value, attributes) end sig do params( filename: String, data_dir: String, already_handled_files: T::Set[String], entries: T::Array[Entry] ).void end def handle_file(filename, data_dir, already_handled_files, entries) return if already_handled_files.include?(filename) already_handled_files.add filename file_path = File.join(data_dir, filename) # Read as UTF-8 File.open(file_path, "r:UTF-8") do |file| file.each_line() do |line| line_result = handle_line(line) if line_result.is_a?(Entry) entries << line_result elsif line_result.is_a?(String) handle_file(line_result, data_dir, already_handled_files, entries) end end end end sig do params(data_dir: String, start_files: T::Array[String]).returns( T::Array[Entry] ) end def handle_data(data_dir, start_files) already_handled_files = T.let(Set.new, T::Set[String]) result = T.let([], T::Array[Entry]) for filename in start_files handle_file(filename, data_dir, already_handled_files, result) end result end sig { params(entries: T::Array[Entry]).returns(T::Array[Entry]) } def get_entries_with_no_attribute(entries) entries.filter { |entry| entry.attributes.empty? } end sig do params(entries: T::Array[Entry], attribute: String).returns(T::Array[Entry]) end def get_entries_with_attribute(entries, attribute) entries.filter { |entry| entry.attributes.include?(attribute) } end sig { params(entries: T::Array[Entry]).void } def print_entries(entries) entries.each { |entry| puts entry.to_s } end ARGV.length == 1 or raise "Usage: generate-surge-geosite.rb " mode = ARGV[0] mode == "china" or mode == "global" or raise "Invalid mode: #{mode}" data_dir, temp_dir = download_data() entries = handle_data(T.must(data_dir), START_FILES) print_entries(get_entries_with_no_attribute(entries)) if mode == "global" print_entries(get_entries_with_attribute(entries, "cn")) if mode == "china" FileUtils.remove_entry(T.must(temp_dir))