Source code for pagexml.analysis.text_stats

from __future__ import annotations

import json
import re
import string
from collections import Counter
from collections import defaultdict
from typing import Dict, Iterable, List, Set, Tuple, Union

import numpy as np

import pagexml.helper.text_helper as text_helper
import pagexml.model.physical_document_model as pdm

_SMALL = 1e-20

wpl_to_cat = {}
wpl_cat_min = {}
wpl_cat_max = {}

for wpl in range(0, 101):
    if wpl > 0:
        wpl_cat = (np.log(wpl) * 2).round(0) / 2
    else:
        wpl_cat = -np.inf
    wpl_to_cat[wpl] = wpl_cat
    if wpl_cat not in wpl_cat_min:
        wpl_cat_min[wpl_cat] = wpl
    wpl_cat_max[wpl_cat] = wpl

wpl_cat_range = {wpl_cat: f"{wpl_cat_min[wpl_cat]}-{wpl_cat_max[wpl_cat]}" for wpl_cat in wpl_cat_min}


[docs] def get_line_text(text_line: Union[str, Dict[str, any]]) -> Union[str, None]: """Convenience function to return the text string of a text line, regardless of whether text_line is a str, or a dictionary or a NoneType.""" if isinstance(text_line, str): return text_line elif text_line is None: return None elif isinstance(text_line, pdm.PageXMLTextLine): return text_line.text elif isinstance(text_line, dict): if 'text' in text_line: return text_line['text'] else: raise KeyError("text_line dict has no 'text' property.") else: raise TypeError("text_line must be a string or a dictionary with a 'text' property")
[docs] def compute_expected(observed: np.array) -> np.array: """Computes the contingency table of the expected values given a contingency table of the observed values.""" expected = np.array([ [ observed[0, :].sum() * observed[:, 0].sum() / observed.sum(), observed[0, :].sum() * observed[:, 1].sum() / observed.sum() ], [ observed[1, :].sum() * observed[:, 0].sum() / observed.sum(), observed[1, :].sum() * observed[:, 1].sum() / observed.sum() ] ]) return expected
[docs] def get_observed(token: str, target_counter: Counter, target_total: int, reference_counter: Counter, reference_total: int): """Computes the contingency table of the observed values given a target token, and target and reference analysers and counters.""" # a: word in target corpus t_target = target_counter[token] if token in target_counter else 0 # b: word in ref corpus t_ref = reference_counter[token] if token in reference_counter else 0 # c: other words in target corpus nt_target = target_total - t_target # d: other words in ref corpus nt_ref = reference_total - t_ref observed = np.array([ [t_target, t_ref], [nt_target, nt_ref] ]) return observed
[docs] def compute_log_likelihood(token: str, target_counter: Counter, target_total: int, reference_counter: Counter, reference_total: int) -> Tuple[float, str]: observed = get_observed(token, target_counter, target_total, reference_counter, reference_total) """Computes the log likelihood ratio for given a target token, and target and reference analysers and counters.""" expected = compute_expected(observed) sum_likelihood = 0 for i in [0, 1]: for j in [0, 1]: sum_likelihood += observed[i, j] * np.log((observed[i, j] + _SMALL) / (expected[i, j] + _SMALL)) return 2 * sum_likelihood, 'more' if observed[0, 0] > expected[0, 0] else 'less'
[docs] def get_keyness_vocab(target_counter: Counter, reference_counter: Counter) -> Set[str]: return set(list(target_counter.keys()) + list(reference_counter.keys()))
[docs] def compute_keyness(target_counter: Counter, reference_counter: Counter, vocab: Iterable[str] = None): """Compute the keyness score of each token in vocabulary for a given target counter and reference counter (available counters are 'all', 'start', 'mid' or 'end). The return value is a dictionary with two properties, 'less' and 'more', each with a Counter object. The 'less' counter contains the log likelihood ratio for tokens that are less common in the target counter than in the reference counter. The 'more' counter contains the log likelihood ratio for tokens that are more common in the target counter than in the reference counter. :param target_counter: the counter used for token frequencies of the target corpus (possible values: 'all', 'start', 'mid' or 'end') :type target_counter: str :param reference_counter: the counter used for token frequencies of the reference corpus (possible values: 'all', 'start', 'mid' or 'end') :param vocab: an optional vocabulary for which to compute keyness values. :type vocab: Iterable[str] """ log_likelihood = { 'less': Counter(), 'more': Counter() } if vocab is None: vocab = set(list(target_counter.keys()) + list(reference_counter.keys())) target_total = sum(target_counter.values()) reference_total = sum(reference_counter.values()) for token in vocab: ll, pref = compute_log_likelihood(token, target_counter, target_total, reference_counter, reference_total) log_likelihood[pref][token] = ll return log_likelihood
[docs] def compute_complement_keyness(target_analyser: LineAnalyser, target_counter: str): """Compute the keyness score of each token in vocabulary for a given target counter and its complement as the reference counter (available counters are 'all', 'start', 'mid' or 'end). The complement is the 'all' counter minus the target counter. The return value is a dictionary with two properties, 'less' and 'more', each with a Counter object. The 'less' counter contains the log likelihood ratio for tokens that are less common in the target counter than in the reference counter. The 'more' counter contains the log likelihood ratio for tokens that are more common in the target counter than in the reference counter. :param target_analyser: the target LineAnalyser :type target_analyser: LineAnalyser :param target_counter: the counter used for token frequencies of the target corpus (possible values: 'all', 'start', 'mid' or 'end') :type target_counter: str """ target_counter = target_analyser.freq[target_counter] reference_counter = Counter() for token in target_analyser.freq['all']: reference_counter[token] = target_analyser.freq['all'][token] - target_counter[token] return compute_keyness(target_counter, reference_counter)
[docs] class LineAnalyser: def __init__(self, word_break_chars: Union[str, Set[str]] = '-', ignorecase: bool = False, token_type: str = None): self.token_type = token_type self.word_break_chars = set(word_break_chars) if isinstance(word_break_chars, str) else word_break_chars self.ignorecase = ignorecase self.freq = { 'all': Counter(), 'start': Counter(), 'mid': Counter(), 'end': Counter(), } self.frac = { 'all': Counter(), 'start': Counter(), 'mid': Counter(), 'end': Counter(), } self.stats = { 'total_all_tokens': 0, 'total_mid_tokens': 0, 'total_end_tokens': 0, 'total_start_tokens': 0, 'total_lines': 0 } self.num_lines = 0 def __add__(self, other: LineAnalyser): merge = LineAnalyser(word_break_chars=self.word_break_chars, ignorecase=self.ignorecase, token_type=self.token_type) merge.freq['mid'] = self.freq['mid'] + other.freq['mid'] merge.freq['end'] = self.freq['end'] + other.freq['end'] merge.freq['all'] = self.freq['all'] + other.freq['all'] merge.freq['start'] = self.freq['start'] + other.freq['start'] merge.set_stats() return merge def __repr__(self): token_stats = json.dumps(self.num_tokens()) type_stats = json.dumps(self.num_types()) return f"{self.__class__.__name__}(num_{self.token_type}_tokens={token_stats}, " \ f"num_{self.token_type}_types={type_stats}, num_lines={self.num_lines})"
[docs] def num_types(self): """Returns descriptive statistics of the number of types per counter.""" return { 'all': len(self.freq['all']), 'start': len(self.freq['start']), 'mid': len(self.freq['mid']), 'end': len(self.freq['end']) }
[docs] def num_tokens(self): """Returns descriptive statistics of the number of tokens per counter.""" return { 'all': sum(self.freq['all'].values()), 'start': sum(self.freq['start'].values()), 'mid': sum(self.freq['mid'].values()), 'end': sum(self.freq['end'].values()) }
[docs] def reset_counters(self): """Reset all the counters.""" self.freq = { 'all': Counter(), 'start': Counter(), 'mid': Counter(), 'end': Counter(), } self.frac = { 'all': Counter(), 'start': Counter(), 'mid': Counter(), 'end': Counter(), } self.stats = { 'total_all_tokens': 0, 'total_mid_tokens': 0, 'total_end_tokens': 0, 'total_start_tokens': 0, 'total_lines': 0 } self.num_lines = 0
[docs] def set_stats(self): self.stats['total_all_tokens'] = sum(self.freq['all'].values()) self.stats['total_end_tokens'] = sum(self.freq['end'].values()) self.stats['total_mid_tokens'] = sum(self.freq['mid'].values()) self.stats['total_start_tokens'] = sum(self.freq['start'].values()) all_total = sum(self.freq['all'].values()) start_total = sum(self.freq['start'].values()) mid_total = sum(self.freq['mid'].values()) end_total = sum(self.freq['end'].values()) for token_type, all_freq in self.freq['all'].most_common(): self.frac['all'][token_type] = self.freq['all'][token_type] / all_total self.frac['start'][token_type] = self.freq['start'][token_type] / start_total self.frac['mid'][token_type] = self.freq['mid'][token_type] / mid_total self.frac['end'][token_type] = self.freq['end'][token_type] / end_total self.stats['total_lines'] = self.num_lines
def _iter_lines(self, text_lines: Iterable[any]): for text_line in text_lines: line_text = get_line_text(text_line) if line_text is None or len(line_text) == 0: continue if self.ignorecase is True: line_text = line_text.lower() yield line_text
[docs] def analyse_line_chars(self, text_lines: Iterable[any]): """Analyse the frequency of characters at the start, middle and end of a text line, for a given list of text lines.""" # print('analysing line characters') for line_text in self._iter_lines(text_lines): self.freq['all'].update(line_text) first_char = line_text[0] self.freq['start'].update([first_char]) if len(line_text) > 1: last_char = line_text[-1] self.freq['end'].update([last_char]) mid_chars = line_text[1:-1] self.freq['mid'].update(mid_chars) self.set_stats()
[docs] def analyse_line_words(self, text_lines: Iterable[any]): """Gather corpus statistics for a list of text lines on words at the start, middle and end of a text line. :param text_lines: an iterable for text lines (either strings or dictionaries with a 'text' property :type text_lines: Iterable[any] """ for line_text in self._iter_lines(text_lines): words = text_helper.get_line_words(line_text, word_break_chars=self.word_break_chars) start_words, mid_words, end_words = text_helper.split_line_words(words) self.freq['mid'].update(mid_words) self.freq['start'].update(start_words) self.freq['end'].update(end_words) self.freq['all'].update(words) self.num_lines += 1 self.set_stats()
[docs] def get_stats(self): """Return statistics on the frequency of characters occuring at the start, middle and end of a text line.""" stats = defaultdict(list) for token_type, all_freq in self.freq['all'].most_common(): try: stats['token_type'].append(token_type) stats['all_freq'].append(self.freq['all'][token_type]) stats['all_frac'].append(self.frac['all'][token_type]) stats['start_freq'].append(self.freq['start'][token_type]) stats['start_frac'].append(self.frac['start'][token_type]) stats['start_rel_frac'].append(self.frac['start'][token_type] / self.frac['all'][token_type]) stats['mid_freq'].append(self.freq['mid'][token_type]) stats['mid_frac'].append(self.frac['mid'][token_type]) stats['mid_rel_frac'].append(self.frac['mid'][token_type] / self.frac['all'][token_type]) stats['end_freq'].append(self.freq['end'][token_type]) stats['end_frac'].append(self.frac['end'][token_type]) stats['end_rel_frac'].append(self.frac['end'][token_type] / self.frac['all'][token_type]) except ZeroDivisionError: print(token_type, all_freq) raise return stats
[docs] class LineCharAnalyser(LineAnalyser): def __init__(self, text_lines: Iterable[any] = None, word_break_chars: Union[str, Set[str]] = '-', ignorecase: bool = False): """A character frequency analyser of a list of text lines. Four frequencies are calculated: - all_freq: the overall frequency of a character - start_freq: the frequency of a character as the first character in a text line - mid_freq: the frequency of a character in the middle of in a text line (so neither as the first nor last character of a line) - end_freq: the frequency of a character as the last character in a text line """ super().__init__(word_break_chars=word_break_chars, ignorecase=ignorecase, token_type='char') if text_lines is not None: self.analyse_line_chars(text_lines)
[docs] class LineWordAnalyser(LineAnalyser): def __init__(self, text_lines: Iterable[any] = None, word_break_chars: Union[str, Set[str]] = '-', ignorecase: bool = False): """A line word analyser class for building PageXML word-based corpus statistics. :param word_break_chars: a list of characters that can occur as word breaks. :type word_break_chars: str """ super().__init__(word_break_chars=word_break_chars, ignorecase=ignorecase, token_type='word') if text_lines is not None: self.analyse_line_words(text_lines)
[docs] def analyse_line_word_categories(self, text_lines: Iterable[str, pdm.PageXMLTextLine, Dict[str, any]], **kwargs) -> Dict[str, Counter]: """Collect counts on the frequency of different word types, e.g. numbers, title words, stopwords, etc. To get counts on stopwords, a stopword list must be passed. For information on what keyword arguments can be passed, see pagexml.analysis.text_stats.get_word_cat_stats. :param text_lines: an iterable with text lines :type text_lines: Iterable[str, PageXMLTextLine, Dict[str, any] """ cat_stats = defaultdict(Counter) for line_text in self._iter_lines(text_lines): words = text_helper.get_line_words(line_text, word_break_chars=self.word_break_chars) word_stats = get_word_cat_stats(words, **kwargs) for word_cat in word_stats: cat_stats[word_cat] += word_stats[word_cat] return cat_stats
[docs] def make_line_analyser(token_type: str, word_break_chars, ignorecase: bool = False): if token_type == 'char': return LineCharAnalyser(word_break_chars=word_break_chars, ignorecase=ignorecase) elif token_type == 'word': return LineWordAnalyser(word_break_chars=word_break_chars, ignorecase=ignorecase) else: raise ValueError(f"invalid token type: {token_type}, must be 'char' or 'word'")
[docs] def merge_analysers(line_analysers: List[LineAnalyser]) -> LineAnalyser: """Merge a list of LineAnalyser objects into a new, single LineAnalyser.""" token_types = set([la.token_type for la in line_analysers]) ignorecases = set([la.ignorecase for la in line_analysers]) word_break_chars = set([lbc for la in line_analysers for lbc in la.word_break_chars]) if len(token_types) > 1: raise TypeError(f"Cannot merge LineAnalysers of different token types: {token_types}") if len(ignorecases) > 1: raise TypeError(f"Cannot merge LineAnalysers with different ignorecases: {ignorecases}") la = line_analysers[0] merged_analyser = make_line_analyser(token_type=la.token_type, word_break_chars=word_break_chars) for analyser in line_analysers: for token in analyser.freq['all']: merged_analyser.freq['all'][token] += analyser.freq['all'][token] if token in analyser.freq['start']: merged_analyser.freq['start'][token] += analyser.freq['start'][token] if token in analyser.freq['mid']: merged_analyser.freq['mid'][token] += analyser.freq['mid'][token] if token in analyser.freq['end']: merged_analyser.freq['end'][token] += analyser.freq['end'][token] merged_analyser.num_lines = sum([la.num_lines for la in line_analysers]) merged_analyser.set_stats() return merged_analyser
[docs] class WordBreakDetector(LineWordAnalyser): def __init__(self, min_bigram_word_freq: int = 5, word_break_chars: Union[str, Set[str]] = '-', ignorecase: bool = False, lines: Iterable = None): """A line break detector class that uses corpus statistics and a configurable list of line break characters to determine, for two subsequent lines, whether the first line ends with a line break (a word broken off mid-word on the first line and continued on the second line.) :param min_bigram_word_freq: the minimum frequency of word bigrams to be considered common bigrams :type min_bigram_word_freq: int :param word_break_chars: a list of characters that can occur as line breaks. :type word_break_chars: str :param ignorecase: whether to ignore differences in case :type ignorecase: bool :param lines: an iterable for text lines (either strings or dictionaries with a 'text' property :type lines: Iterable[any] """ super().__init__(word_break_chars=word_break_chars, ignorecase=ignorecase) self.end_with_wbd_freq = Counter() self.start_with_wbd_freq = Counter() self.mid_bigram_freq = Counter() self.typical_start_merged_with = defaultdict(Counter) self.typical_end_merged_with = defaultdict(Counter) self.common_start_merged_with = defaultdict(Counter) self.common_end_merged_with = defaultdict(Counter) self.typical_merge_starts = set() self.typical_non_merge_starts = set() self.typical_merge_ends = set() self.typical_non_merge_ends = set() self.common_merge_starts = set() self.common_non_merge_starts = set() self.common_merge_ends = set() self.common_non_merge_ends = set() self.min_bigram_word_freq = min_bigram_word_freq if lines is not None: self.set_counters(lines) self.set_stats()
[docs] def reset_counters(self): """Reset all the counters.""" super().reset_counters() self.end_with_wbd_freq = Counter() self.start_with_wbd_freq = Counter() self.mid_bigram_freq = Counter() self.typical_start_merged_with = defaultdict(Counter) self.typical_end_merged_with = defaultdict(Counter) self.common_start_merged_with = defaultdict(Counter) self.common_end_merged_with = defaultdict(Counter) self.typical_merge_starts = set() self.typical_non_merge_starts = set() self.typical_merge_ends = set() self.typical_non_merge_ends = set() self.common_merge_starts = set() self.common_non_merge_starts = set() self.common_merge_ends = set() self.common_non_merge_ends = set()
[docs] def print_counter_stats(self): """Print overall statistics on the vocabulary derived from the analysed text lines.""" line_count = sum(self.freq["start"].values()) print("number of lines:", line_count) # print("number of non-empty lines:", sum(self.start_freq.values())) # print("number of empty lines:", line_count - sum(self.start_freq.values())) print("number of words per line:", sum(self.freq["all"].values()) / line_count) print(f'{"all:": <12}{len(self.freq["all"]): >10} types\t{sum(self.freq["all"].values()): >10} tokens') print(f'{"start:": <12}{len(self.freq["start"]): >10} types\t{sum(self.freq["start"].values()): >10} tokens') print(f'{"mid:": <12}{len(self.freq["mid"]): >10} types\t{sum(self.freq["mid"].values()): >10} tokens') print(f'{"end:": <12}{len(self.freq["end"]): >10} types\t{sum(self.freq["end"].values()): >10} tokens') print(f'{"mid bigrams:": <12}{len(self.mid_bigram_freq): >10} types' f'\t{sum(self.mid_bigram_freq.values()): >10} tokens') print(f'Number of typical merge line ends: {len(self.typical_merge_ends)}') print(f'Number of typical merge line starts: {len(self.typical_merge_starts)}') print(f'Number of common merge line ends: {len(self.common_merge_ends)}') print(f'Number of common merge line starts: {len(self.common_merge_starts)}')
[docs] def set_counters(self, lines: Iterable[any]): """Gather corpus statistics for a list of text lines on words at the start, middle and end of a text line, and on word bigrams in the middle of a line. :param lines: an iterable for text lines (either strings or dictionaries with a 'text' property :type lines: Iterable[any] """ print('Step 1: setting unigram counters') self._set_unigram_counters(lines) print('Step 2: setting bigram counter') self._set_bigram_counter(lines) print('Step 3: setting common merge counters') self._set_merged_with(lines) print('Step 4: setting typical line ends and starts') self._set_typical_start_ends() print('Step 5: setting common line ends and starts') self._set_common_start_ends()
def _set_unigram_counters(self, lines: Iterable[Union[str, Dict[str, str]]]): li = 0 for li, line in enumerate(lines): if line["text"] is None: continue words = text_helper.get_line_words(line["text"], word_break_chars=self.word_break_chars) start_words, mid_words, end_words = text_helper.split_line_words(words) start_with_wbd_words = [w for w in start_words if w[0] in self.word_break_chars] start_with_wbd_words = ['<LBC>' + w[:1] for w in start_with_wbd_words] end_with_wbd_words = [w for w in end_words if w[-1] in self.word_break_chars] end_with_wbd_words += [w[:-1] + '<LBC>' for w in end_with_wbd_words] self.freq['mid'].update(mid_words) self.freq['start'].update(start_words) self.freq['end'].update(end_words) self.start_with_wbd_freq.update(start_with_wbd_words) self.end_with_wbd_freq.update(end_with_wbd_words) self.freq['all'].update(words) print(li + 1, f'lines processed' f'\tall: {len(self.freq["all"]): >8} types' f'\t{sum(self.freq["all"].values()): >8} tokens') def _set_bigram_counter(self, lines: Iterable[Union[str, Dict[str, str]]]): li = 0 for li, line in enumerate(lines): if line["text"] is None: continue words = text_helper.get_line_words(line["text"], word_break_chars=self.word_break_chars) start_words, mid_words, end_words = text_helper.split_line_words(words) for i in range(0, len(mid_words) - 2): if self.freq['mid'][mid_words[i]] < self.min_bigram_word_freq or \ self.freq['mid'][mid_words[i + 1]] < self.min_bigram_word_freq: continue self.mid_bigram_freq.update([(mid_words[i], mid_words[i + 1])]) print(li + 1, f'lines processed\tall: {len(self.mid_bigram_freq)} bigrams') def _set_merged_with(self, lines: Iterable[Union[str, Dict[str, str]]], min_common_freq: int = 1000) -> None: prev_words = [] typical_start_words, typical_end_words = get_typical_start_end_words(self) for li, line in enumerate(lines): if line["text"] is None: continue words = text_helper.get_line_words(line["text"], word_break_chars=self.word_break_chars) if len(prev_words) == 0 or len(words) == 0: pass else: end_word = prev_words[-1] start_word = words[0] merge_word = end_word + start_word reduce_word = text_helper.remove_word_break_chars(end_word, start_word, self.word_break_chars) merge_word = merge_word if self.freq['mid'][merge_word] > self.freq['mid'][reduce_word] else reduce_word if len(merge_word) > 0 and merge_word[-1] in self.word_break_chars: # when the line start word ends with a hyphen, e.g. 'geval-' + 'len-' -> 'gevallen' if self.freq['mid'][merge_word[-1]] > self.freq['mid'][merge_word]: merge_word = merge_word[:-1] if end_word not in self.word_break_chars and self.freq['mid'][merge_word] > 1: if start_word in typical_start_words: self.typical_start_merged_with[start_word].update([(end_word, merge_word)]) if end_word in typical_end_words: self.typical_end_merged_with[end_word].update([(start_word, merge_word)]) if self.freq['start'][start_word] >= min_common_freq: self.common_start_merged_with[start_word].update([(end_word, merge_word)]) if self.freq['end'][end_word] >= min_common_freq: self.common_end_merged_with[end_word].update([(start_word, merge_word)]) prev_words = words def _set_typical_start_ends(self): typical_start_words, typical_end_words = get_typical_start_end_words(self) for end_word in sorted(typical_end_words, key=lambda w: sum(self.typical_end_merged_with[w].values())): merge_exist_frac = sum(self.typical_end_merged_with[end_word].values()) / self.freq['end'][end_word] if merge_exist_frac > 0.5: self.typical_merge_ends.add(end_word) elif merge_exist_frac < 0.05: self.typical_non_merge_ends.add(end_word) # merge_freq = sum([self.freq['mid'][merged_word] # for start_word, merged_word in self.typical_end_merged_with[end_word]]) # freqs = f"{self.freq['end'][end_word]: >8}{self.freq['mid'][end_word]: >8}{self.freq['all'][end_word]: >8}" # print(f"{end_word: <20}{freqs}{merge_exist_frac: >8.2f}{merge_freq: >8}") for start_word in sorted(typical_start_words, key=lambda w: sum(self.typical_start_merged_with[w].values())): merge_exist_frac = sum(self.typical_start_merged_with[start_word].values()) / self.freq['start'][start_word] if merge_exist_frac > 0.5: self.typical_merge_starts.add(start_word) elif merge_exist_frac < 0.05 and start_word.isupper() is False: self.typical_non_merge_starts.add(start_word) # merge_freq = sum([self.freq['mid'][merged_word] # for end_word, merged_word in self.typical_start_merged_with[start_word]]) # freqs = f"{self.freq['start'][start_word]: >8}{self.freq['mid'][start_word]: >8}" \ # f"{self.freq['all'][start_word]: >8}" # print(f"{start_word: <20}{freqs}{merge_exist_frac: >8.2f}{merge_freq: >8}") def _set_common_start_ends(self): for start_word in sorted(self.common_start_merged_with, key=lambda t: self.freq['mid'][t] / self.freq['start'][t]): merge_exist_frac = sum(self.common_start_merged_with[start_word].values()) / self.freq['start'][start_word] merge_freq = sum([self.freq['mid'][merged_word] for end_word, merged_word in self.common_start_merged_with[start_word]]) if start_word in self.typical_merge_starts or start_word in self.typical_non_merge_starts: continue if merge_exist_frac < 0.02: self.common_non_merge_starts.add(start_word) if merge_exist_frac < 0.2 or merge_freq < 100: continue self.common_merge_starts.add(start_word) for end_word in sorted(self.common_end_merged_with, key=lambda t: self.freq['mid'][t] / self.freq['end'][t]): merge_exist_frac = sum(self.common_end_merged_with[end_word].values()) / self.freq['end'][end_word] merge_freq = sum([self.freq['mid'][merged_word] for start_word, merged_word in self.common_end_merged_with[end_word]]) if end_word in self.typical_merge_ends or end_word in self.typical_non_merge_ends: continue if merge_exist_frac < 0.02: self.common_non_merge_ends.add(end_word) if merge_exist_frac < 0.2 or merge_freq < 100: continue self.common_merge_ends.add(end_word)
[docs] def show_word_break_context(wbd: WordBreakDetector, end_word: str, start_word: str, merge_word: str, match: str = None): last = f'{end_word: <15}{wbd.freq["mid"][end_word]: >8}{wbd.freq["end"][end_word]: >8}' first = f'\t{start_word: <15}{wbd.freq["start"][start_word]: >8}{wbd.freq["mid"][start_word]: >8}' \ f'{wbd.freq["all"][start_word]: >8}' merge = f'\t{merge_word: <15}{wbd.freq["all"][merge_word]: >8}' if match: print(f'{last}{first}{merge}\t{match}') else: print(f'{last}{first}{merge}')
[docs] def determine_word_break(curr_words: List[str], prev_words: List[str], wbd: WordBreakDetector = None, word_break_chars: Union[str, Set[str]] = '-', debug: bool = False) -> Tuple[bool, Union[str, None]]: """Determine for a current line and previous line (as lists of words) whether the first line ends with a line break. :param curr_words: a list of words for the current line to be merged with the previous line :type curr_words: List[str] :param prev_words: a list of words for the previous line to be merged with the current line :type prev_words: List[str] :return: a flag whether the previous line ends in a line break and the merged word composed of the previous line's last word and current line's first word (or None if the words should not be merged) :param wbd: a line break detector object :type wbd: WordBreakDetector :param word_break_chars: a list of characters that can occur as word breaks. :type word_break_chars: str :rtype: Union[str, None] :param debug: print debugging information """ if wbd is not None and wbd.word_break_chars is not None: word_break_chars = set([char for char in wbd.word_break_chars]) if len(prev_words) == 0 or len(curr_words) == 0: # print('includes non-word') return False, None end_word = prev_words[-1] start_word = curr_words[0] merge_word = end_word + start_word reduce_word = text_helper.remove_word_break_chars(end_word, start_word, word_break_chars) if wbd is None: return (True, reduce_word) if end_word[-1] in word_break_chars else (False, None) merge_word = merge_word if wbd.freq['all'][merge_word] > wbd.freq['all'][reduce_word] else reduce_word if debug: print(f"end: #{end_word}#\tstart: #{start_word}#") print('reduce_word', reduce_word) print('merge_word', merge_word) bigram_freq = wbd.mid_bigram_freq[(end_word, start_word)] if end_word[-1] in word_break_chars: bigram_freq = wbd.mid_bigram_freq[(end_word[:-1], start_word)] # print(end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], # (end_word[:-1], start_word), # merge_word, wbd.freq['mid'][merge_word], 'bigram_freq:', bigram_freq) if has_non_merge_word(wbd, end_word, start_word): if debug: print('has_none_merge_word', end_word, start_word) return False, None if end_start_are_bigram(wbd, merge_word, bigram_freq, factor=5): if debug: print('end_start_are_bigram', end_word, start_word) return False, None elif start_is_titleword(start_word): if end_start_are_hyphenated_compound(wbd, end_word, start_word, merge_word): merge_word = end_word + start_word # print('end_start_are_hyphenated_compound', end_word, start_word) if debug: print('end_start_are_hyphenated_compound', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word elif start_word_has_incorrect_titlecase(wbd, end_word, start_word, factor=10): # print('start_word_has_incorrect_titlecase', end_word, start_word) if debug: print('start_word_has_incorrect_titlecase', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word else: if debug: print('start_word_is_titleword', end_word, start_word) return False, None elif has_common_merge_end(wbd, end_word, start_word): # print('has_common_merge_end', end_word, start_word, merge_word) if debug: print('has_common_merge_end', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word elif has_word_break_symbol(wbd, end_word, start_word, merge_word): # print('has_word_break_symbol', end_word, start_word, merge_word) if debug: print('has_word_break_symbol', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word if end_start_are_bigram(wbd, merge_word, bigram_freq, factor=2): if debug: print('end_start_are_bigram', end_word, start_word) return False, None if end_is_common_word(wbd, end_word, common_freq=1000): if debug: print('end_is_common_word', end_word, start_word) return False, None elif merge_is_more_common(wbd, end_word, start_word, merge_word): # print('merge_is_more_common', end_word, start_word) if debug: print('merge_is_more_common', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word elif end_word[-1] in wbd.word_break_chars: # print('merge_word_break', end_word, start_word, merge_word) if debug: print('merge line break', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return True, merge_word # show_word_break_context(wbd, end_word, start_word, merge_word) else: if debug: print('OTHER', end_word, wbd.freq['mid'][end_word], start_word, wbd.freq['mid'][start_word], merge_word, wbd.freq['mid'][merge_word]) return False, None
[docs] def merge_is_more_common(wbd, end_word, start_word, merge_word): if wbd.freq['all'][merge_word] > wbd.freq['mid'][end_word] and \ wbd.freq['all'][merge_word] > wbd.freq['mid'][start_word]: return True elif is_non_mid_word(wbd, end_word, factor=5) and \ wbd.freq['all'][merge_word] > wbd.freq['mid'][start_word]: return True elif is_non_mid_word(wbd, start_word, factor=5) and \ wbd.freq['all'][merge_word] > wbd.freq['mid'][end_word]: return True elif wbd.freq['all'][merge_word] > 0: return True else: return False
[docs] def end_is_common_word(wbd: WordBreakDetector, end_word: str, common_freq: int = 100, debug: bool = False) -> bool: # return wbd.freq['all'][end_word] >= common_freq or wbd.freq['all'][start_word] >= common_freq if debug: print(end_word, wbd.freq['mid'][end_word], common_freq) return wbd.freq['mid'][end_word] >= common_freq
[docs] def has_word_break_symbol(wbd, end_word, start_word, merge_word): if end_word[-1] != '-': return False if wbd.freq['all'][merge_word] > wbd.freq['all'][end_word]: return True elif wbd.freq['all'][merge_word] > 0: return True elif wbd.freq['mid'][start_word] > wbd.freq['start'][start_word]: return False elif start_word.isdigit(): return False else: return True
[docs] def has_common_merge_end(wbd: WordBreakDetector, end_word: str, start_word: str) -> bool: if end_word in wbd.typical_merge_ends: return True elif start_word in wbd.typical_merge_starts: return True if start_word in wbd.common_non_merge_starts: return False else: return False
[docs] def is_non_mid_word(wbd: WordBreakDetector, word: str, factor: int = 5) -> bool: if wbd.freq['end'][word] > factor * wbd.freq['mid'][word]: return True if wbd.freq['start'][word] > factor * wbd.freq['mid'][word]: return True return False
[docs] def start_word_has_incorrect_titlecase(wbd: WordBreakDetector, end_word: str, start_word: str, factor: int = 10) -> bool: if start_word in wbd.common_non_merge_starts: return False if start_word.isupper(): return False if wbd.freq['all'][start_word] < factor and wbd.freq['all'][end_word] < factor: return False return is_non_mid_word(wbd, start_word) and is_non_mid_word(wbd, end_word)
[docs] def end_start_are_hyphenated_compound(wbd: WordBreakDetector, end_word: str, start_word: str, merge_word: str) -> bool: if start_word.isupper(): # entire start word is upper case, so no part of compound return False if end_word[0].isupper() and end_word[-1] == '-' and start_word[0].isupper(): # end word and start word are both in title case, and end word # ends with hyphen, so they're probably a hyphenated compound if wbd.freq['mid'][start_word] == 0 and wbd.freq['all'][merge_word] == 0 and \ wbd.freq['all'][end_word + start_word] == 0: # start_word is never observed in the middle of a line, so is likely # a broken off word, and is incorrectly title cased or # its sentence is not the correct one following end_word return False if wbd.freq['mid'][end_word] == 0 and wbd.freq['all'][merge_word] == 0 and \ wbd.freq['all'][end_word + start_word] == 0: # end_word is never observed in the middle of a line, so is likely # a broken off word, and start_word is incorrectly title cased or # its sentence is not the correct one following end_word return False else: return True else: return False
[docs] def start_is_titleword(start_word: str) -> bool: return start_word[0].isupper()
[docs] def end_start_are_bigram(wbd: WordBreakDetector, merge_word: str, bigram_freq: int, factor: int = 5) -> bool: # the bigram of end word and start word is much more frequent than their # merge, so treat as bigram # if frequency of merge_word is zero, bigram_freq should be at least 5 return bigram_freq > factor and bigram_freq > factor * wbd.freq['all'][merge_word]
[docs] def determine_word_break_typical_merge_end(wbd: WordBreakDetector, end_word: str, start_word: str, merge_word: str) -> bool: if end_word in wbd.typical_merge_ends: if wbd.freq['all'][merge_word] >= 10: return True elif wbd.freq['mid'][start_word] > 100 and end_word.endswith('-'): if wbd.freq['mid'][end_word[:-1]] > 100 and wbd.freq['mid'][end_word[:-1]] > 10 * wbd.freq['mid'][end_word]: return False elif wbd.freq['all'][merge_word] > 0: return True else: return False else: return True
[docs] def has_non_merge_word(wbd: WordBreakDetector, end_word: str, start_word: str, debug: bool = False) -> bool: if not re.search(r'\w', end_word): # end word is just punctuation, so don't merge if debug: print(f'end_word is punctuation: #{end_word}#') return True if not re.search(r'\w', start_word): # start word is just punctuation, so don't merge if debug: print(f'start_word is punctuation: #{start_word}#') return True if end_word == '-': # end word is just hyphen, so don't merge if debug: print(f'end_word is hyphen: #{end_word}#') return True if start_word == '-': # start word is just hyphen, so don't merge if debug: print(f'start_word is hyphen: #{start_word}#') return True elif end_word in wbd.typical_non_merge_ends: # start word is a non-merge word so don't merge if debug: print(f'end_word is non_merge_end: #{end_word}#') return True elif start_word in wbd.typical_non_merge_starts: # start word is a non-merge word so don't merge if debug: print(f'start_word is non_merge_start: #{start_word}#') return True else: return False
[docs] def get_typical_start_end_words(wbd: WordBreakDetector, threshold: float = 0.5) -> Tuple[Set[str], Set[str]]: typical_start_words = set() for end_word in wbd.freq['start']: if wbd.freq['start'][end_word] > 100 and \ wbd.freq['start'][end_word] / wbd.freq['all'][end_word] > threshold: typical_start_words.add(end_word) typical_end_words = set() for end_word in wbd.freq['end']: if wbd.freq['end'][end_word] > 100 and \ wbd.freq['end'][end_word] / wbd.freq['all'][end_word] > threshold: typical_end_words.add(end_word) return typical_start_words, typical_end_words
[docs] def get_words_per_line(lines: List[pdm.PageXMLTextLine], use_re_word_boundaries: bool = False): """Return a Counter of the number of words per line of a PageXML pagexml_doc object. :param lines: a list of PageXMLTextLine objects :type lines: List[PageXMLTextLine] :param use_re_word_boundaries: whether to split words of a line using RegEx word boundaries :type use_re_word_boundaries: bool :return: a counter of the number of words per line of a pagexml_doc :rtype: Counter """ words_per_line = Counter() if isinstance(lines, pdm.PageXMLTextRegion): lines = lines.get_lines() for line in lines: if line.text is None or line.text == '': words = [] elif use_re_word_boundaries: words = [w.replace(' ', '') for w in re.split(r'\b', line.text) if w != ' ' and w != ''] else: words = [w for w in line.text.split(' ')] # words_per_line.update([len(words)]) if len(words) in wpl_to_cat: wpl_cat = wpl_to_cat[len(words)] else: wpl_cat = max(wpl_cat_range.keys()) words_per_line.update([wpl_cat_range[wpl_cat]]) return words_per_line
[docs] def get_doc_words(pagexml_doc: pdm.PageXMLTextRegion, use_re_word_boundaries: bool = False) -> List[str]: """Return a list of words that are part of a PageXML pagexml_doc object. :param pagexml_doc: a PageXML document object :type pagexml_doc: PageXMLTextRegion :param use_re_word_boundaries: whether to split words of a line using RegEx word boundaries :type use_re_word_boundaries: bool :return: a list of all words on a pagexml_doc :rtype: List[str] """ lines = [line for line in pagexml_doc.get_lines() if line.text is not None] if use_re_word_boundaries: return [w.replace(' ', '') for line in lines for w in re.split(r'\b', line.text) if w != ' ' and w != ''] else: return [w for line in lines for w in line.text.split(' ')]
[docs] def get_word_cat_stats(words, stop_words=None, max_word_length: int = 30, word_length_bin_size: int = 5): """Calculate word type statistics for the word of a given PageXML scan. :param words: a list of words on a scan :type words: List[str] :param stop_words: a list of stopwords :type stop_words: List[str] :param max_word_length: the maximum length of words to be considered a regular word :type max_word_length: int (default 30 characters) :param word_length_bin_size: bin size for grouping words within a character length interval :type word_length_bin_size: int (default per 5 characters) """ puncs = set(string.punctuation) num_oversized_words = len([w for w in words if len(w) > max_word_length]) word_length_freq = Counter([len(w) for w in words if len(w) <= max_word_length]) word_cat_stats = { 'num_words': len(words), 'num_number_words': len([w for w in words if w.isdigit()]), 'num_title_words': len([w for w in words if w.istitle()]), 'num_non_title_words': len([w for w in words if w.istitle() is False]), 'num_stop_words': len([w for w in words if w in stop_words]) if stop_words is not None else None, 'num_punctuation_words': len([w for w in words if all(j in puncs for j in w)]), 'num_oversized_words': num_oversized_words } word_length_bin = word_length_bin_size word_cat_stats[f'num_words_length_{word_length_bin}'] = 0 for wl in range(1, max_word_length + 1): if wl > word_length_bin: word_length_bin += word_length_bin_size word_cat_stats[f'num_words_length_{word_length_bin}'] = 0 word_cat_stats[f'num_words_length_{word_length_bin}'] += word_length_freq[wl] return word_cat_stats