diff --git a/bin/chardict.py b/bin/chardict.py index 5ec3e5d..5ac50f1 100755 --- a/bin/chardict.py +++ b/bin/chardict.py @@ -1,50 +1,67 @@ #!/usr/bin/env python3 -"""Turn corpus texts into dictionaries of symbols, bigrams and trigrams.""" +"""Turn corpus texts into dictionaries of n-grams.""" import json -from os import listdir, path +from pathlib import Path from sys import argv -IGNORED_CHARS = "1234567890 \t\r\n\ufeff" - - -def parse_corpus(file_path): - """Count symbols, bigrams and trigrams in a text file.""" - - symbols = {} - bigrams = {} - trigrams = {} - char_count = 0 - prev_symbol = None - prev_prev_symbol = None - - # get a dictionary of all symbols (letters, punctuation marks...) - file = open(file_path, "r", encoding="utf-8") - for char in file.read(): - symbol = char.lower() - if char not in IGNORED_CHARS: - char_count += 1 - if symbol not in symbols: - symbols[symbol] = 0 - symbols[symbol] += 1 - if prev_symbol is not None: - bigram = prev_symbol + symbol - if bigram not in bigrams: - bigrams[bigram] = 0 - bigrams[bigram] += 1 - if prev_prev_symbol is not None: - trigram = prev_prev_symbol + bigram - if trigram not in trigrams: - trigrams[trigram] = 0 - trigrams[trigram] += 1 - prev_prev_symbol = prev_symbol - prev_symbol = symbol - else: - prev_symbol = None - file.close() +NGRAM_MAX_LENGTH = 4 # trigrams +IGNORED_CHARS = "1234567890 \t\r\n\ufeff↵" + + +def parse_corpus(txt: str) -> dict: + """Count ngrams in a string. + retuns a dict of ngrams + ngrams[1]=symbols + ngrams[2]=bigrames + ngrams[3]=trigrams + etc., up to NGRAM_MAX_LENGTH + ngrams[2] is shaped as { "aa": count } + """ + + ngrams = {} + ngrams_count = {} # ngrams_count counts the total number of ngrams[i] in corpus. + + txt = txt.lower() # we want to be case **in**sensitive + + for ngram in range(1, NGRAM_MAX_LENGTH): + ngrams[ngram] = {} + ngrams_count[ngram] = 0 + + def get_ngram(txt: str, ngram_start: int, ngram_length: int) -> str: + """get a ngram of a given length at given position in txt + returns empty string if ngram cannot be provided""" + if txt[ngram_start] in IGNORED_CHARS: + return "" + if ngram_length <= 0: + return "" + if ngram_start + ngram_length >= len(txt): + return "" + + ngram = txt[ngram_start : ngram_start + ngram_length] + + for n in ngram[1:]: # 1st char already tested + if n in IGNORED_CHARS: + return "" + + return ngram + + # get all n-grams + for ngram_start in range(len(txt)): + for ngram_length in range(NGRAM_MAX_LENGTH): + _ngram = get_ngram(txt, ngram_start, ngram_length) + + if not _ngram: # _ngram is "" + continue + + if _ngram not in ngrams[ngram_length]: + ngrams[ngram_length][_ngram] = 0 + + ngrams[ngram_length][_ngram] += 1 + ngrams_count[ngram_length] += 1 # sort the dictionary by symbol frequency (requires CPython 3.6+) - def sort_by_frequency(table, precision=3): + def sort_by_frequency(table: dict, char_count: int, precision: int = 3) -> dict: sorted_dict = {} for key, count in sorted(table.items(), key=lambda x: -x[1]): freq = round(100 * count / char_count, precision) @@ -52,27 +69,45 @@ def sort_by_frequency(table, precision=3): sorted_dict[key] = freq return sorted_dict - results = {} - results["corpus"] = file_path - results["symbols"] = sort_by_frequency(symbols) - results["bigrams"] = sort_by_frequency(bigrams, 4) - results["trigrams"] = sort_by_frequency(trigrams) - return results + for ngram in range(1, NGRAM_MAX_LENGTH): + ngrams[ngram] = sort_by_frequency(ngrams[ngram], ngrams_count[ngram], 4) + + return ngrams, ngrams_count + + +def read_corpus(file: Path, encoding="utf-8") -> dict: + """read a .txt file and provide a dictionary of n-grams""" + try: + if not file.is_file: + raise Exception("Error, this is not a file") + with file.open("r", encoding=encoding) as f: + corpus_txt = "↵".join(f.readlines()) + + except Exception as e: + print(f"file does not exist or could not be read.\n {e}") + + ngrams_freq, ngrams_count = parse_corpus(corpus_txt) + return { + "freq": ngrams_freq, + "count": ngrams_count, + } if __name__ == "__main__": if len(argv) == 2: # convert one file - data = parse_corpus(argv[1]) + file = Path(argv[1]) + data = read_corpus(file) + output_file_path = file.parent / f"{file.stem}.json" + with open(output_file_path, "w", encoding="utf-8") as outfile: + json.dump(data, outfile, indent=4, ensure_ascii=False) print(json.dumps(data, indent=4, ensure_ascii=False)) + else: # converts all *.txt files in the script directory - bin_dir = path.dirname(__file__) - destdir = path.join(bin_dir, "..", "txt") - txtdir = path.join(bin_dir, "..", "txt") - for filename in listdir(txtdir): - if filename.endswith(".txt"): - basename = filename[:-4] - print(f"... {basename}") - data = parse_corpus(path.join(txtdir, filename)) - destfile = path.join(destdir, basename + ".json") - with open(destfile, "w", encoding="utf-8") as outfile: + txt_dir = Path(__file__).resolve().parent.parent / "txt" + for file in sorted(txt_dir.glob("*.txt")): + if file.is_file(): + print(f"... {file.stem}") + data = read_corpus(file) + output_file_path = txt_dir / f"{file.stem}.json" + with open(output_file_path, "w", encoding="utf-8") as outfile: json.dump(data, outfile, indent=4, ensure_ascii=False) diff --git a/bin/merge.py b/bin/merge.py index 5efe94b..3c466ee 100755 --- a/bin/merge.py +++ b/bin/merge.py @@ -3,43 +3,101 @@ import json from sys import argv +from pathlib import Path -def merge(filenames, filecount): - merged = { - "symbols": {}, - "bigrams": {}, - "trigrams": {}, - } +# sort the merged dictionary by symbol frequency (requires CPython 3.6+) +def _sort_ngram_by_frequency(table, precision=3): + sorted_dict = {} + for key, count in sorted(table.items(), key=lambda x: -x[1]): + freq = round(count, precision) + if freq > 0: + sorted_dict[key] = freq + return sorted_dict + + +def sort_by_frequency(corpus: dict, precision=3): + for index in range(1, len(corpus["freq"].keys())+1): + ngram = str(ngram) + corpus["freq"][ngram] = _sort_ngram_by_frequency( + corpus["freq"][ngram], precision + ) + return corpus - # merge dictionaries +def read_corpora(filenames: list[Path]) -> list[dict]: + """open a collection of corpus from path and dump its content in a dictionary""" + corpora = [] for filename in filenames: - with open(filename, "r") as corpus: - data = json.load(corpus) - for section in merged.keys(): - for key, count in data[section].items(): - if key not in merged[section]: - merged[section][key] = 0.0 - merged[section][key] += count / filecount - - # sort the merged dictionary by symbol frequency (requires CPython 3.6+) - def sort_by_frequency(table, precision=2): - sorted_dict = {} - for key, count in sorted(table.items(), key=lambda x: -x[1]): - freq = round(count, precision) - if freq > 0: - sorted_dict[key] = freq - return sorted_dict - - results = {} - results["corpus"] = "" - results["symbols"] = sort_by_frequency(merged["symbols"]) - results["bigrams"] = sort_by_frequency(merged["bigrams"], 4) - results["trigrams"] = sort_by_frequency(merged["trigrams"]) - return results + try: + with open(filename) as f: + corpus = json.load(f) + corpora.append(corpus) + except: + print( + f"Warning: cannot open the `{filename.stem}` corpus; skipping this file" + ) + continue + return corpora + +def mergeable(corpora:list[dict]) -> bool: + """check if corpora cam be merge (n-gram of same length)""" + error_str = "Error: at least 2 corpuses are needed to merge, aborting" + if len(corpora) < 2: + print(error_str) + return False + + # removing corpus that do not have the same ngram length + ngram_length = len( corpora[0]["freq"] ) + corpora_initial_length = len(corpora) + corpora = [corpus for corpus in corpora if len(corpus["freq"]) == ngram_length] + if len(corpora) != corpora_initial_length: + print(f"Error: cannot merge because corpus file format is different; all corpuses do not have the same ngram length") + + if len(corpora) >= 2: + return True + + print(error_str) + return False + +def mix(corpora:list[dict], weights:list[float]=None) -> dict: + """merge corpora of same n-gram length, optionally with a given set of weight""" + weights = weights or [] + if weights == []: + # merge with same weight by default + weights = [ 1/len(corpora) ] * len(corpora) + elif round(sum(weights),1) != 1: + print("Error: provided merge ratio do not add-up to 1; aborting merge") + + ngram_length = range(1, len(corpora[0]["freq"].keys()) +1) + + output_corpus = { + "freq": {str(n):{} for n in ngram_length}, + "count": {str(n):0 for n in ngram_length}, + } + + for index in ngram_length: + n = str(index) + for corpus_index, corpus in enumerate(corpora): + output_corpus["count"][n] += corpus["count"][n] + for ngram in corpus["freq"][n]: + if ngram not in output_corpus["freq"][n]: + output_corpus["freq"][n][ngram] = 0 + output_corpus["freq"][n][ngram] += corpus["freq"][n][ngram] * weights[corpus_index] + return output_corpus + if __name__ == "__main__": argl = len(argv) - 1 # number of files to merge if argl >= 2: - print(json.dumps(merge(argv[1:], argl), indent=4, ensure_ascii=False)) + dir = Path(__file__).resolve().parent.parent + files = [Path(f) for f in argv[1:]] + corpora = read_corpora(files) + if not mergeable(corpora): + print("Error: cannot merge corpora, aborting") + exit() + name = "mixed" + corpus = mix(corpora) + with open(f"{name}.json", "w", encoding="utf-8") as outfile: + json.dump(corpus, outfile, indent=4, ensure_ascii=False) + print(json.dumps(corpus, indent=4, ensure_ascii=False))