Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions print_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#!/usr/bin/env python3
# @file: print_file.py
# @auth: sprax
# @date: 2016-07-27 01:31:55 Wed 27 Jul

# Sprax Lines 2016.07.25 Written with Python 3.5
'''print not necessarily ASCII text file to terminal'''

Expand All @@ -14,23 +18,24 @@ def read_file_lines(file):
my_print(line)
return lines


def print_lines(lines):
''' print items in a list on separate lines '''
for line in lines:
my_print(line)



def my_print(line):
''' print encoded string '''
print(line.encode("utf-8"))


def uprint(*objects, sep=' ', end='\n', file=sys.stdout):
''' print with encoding '''
''' print with encoding '''
enc = file.encoding
if enc == 'UTF-8':
print(*objects, sep=sep, end=end, file=file)
else:
f = lambda obj: str(obj).encode(enc, errors='backslashreplace').decode(enc)
def f(obj): return str(obj).encode(
enc, errors='backslashreplace').decode(enc)
print(*map(f, objects), sep=sep, end=end, file=file)
138 changes: 94 additions & 44 deletions subs/subs_cipher.py → subs/sub_cipher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#!/usr/bin/env python3
# @file: subs_cipher.py
# @auth: sprax
# @date: 2016-07-25 12:33:48 Mon 25 Jul

# Sprax Lines 2016.07.25 Written with Python 3.5
'''Class and driver script to solve simple substitution cipher from
'''
Class and driver script to solve simple substitution cipher from
a corpus and encoded text in separate text files.

Usage: python3 subs_cipher.py [cipher_file [corpus_file [verbosity]]]
Expand Down Expand Up @@ -30,19 +35,22 @@
import heapq
import re
import sys
from collections import defaultdict
from collections import Counter
from collections import Counter, defaultdict


class SubCipher:
'''Solver to infer a simple substituion cipher based on a large
corpus and small sample of encoded text. Assumes English for
boot-strapping off these four words: I, a, the, and.'''

def __init__(self, cipher_file, corpus_file, verbose):
self.cipher_file = cipher_file
self.corpus_file = corpus_file
self.cipher_lines = read_file_lines(cipher_file)
self.cipher_len_1, self.cipher_words = word_counts_short_and_long(cipher_file, 1)
self.corpus_len_1, self.corpus_words = word_counts_short_and_long(corpus_file, 1)
self.cipher_len_1, self.cipher_words = word_counts_short_and_long(
cipher_file, 1)
self.corpus_len_1, self.corpus_words = word_counts_short_and_long(
corpus_file, 1)
self.cipher_chars = count_chars_from_words(self.cipher_words)
self.corpus_chars = count_chars_from_words(self.corpus_words)
self.forward_map = defaultdict(int)
Expand All @@ -59,7 +67,7 @@ def solve(self):
substitution cipher, and a corpus of English text expected to contain
most of the words in the encoded text, decipher the encoded file.
Uses the SubCipher class.
'''
'''
# Find the words "a" and "I" (not crucial: it's okay if this fails).
self.find_a_and_i()

Expand All @@ -79,7 +87,8 @@ def solve(self):
self.find_words_from_ciphers()
if self.verbose > 0:
matches, misses, missing_words = self.count_decoded_words_in_corpus()
print("Distinct decoded words found in corpus: {} misses: {}".format(matches, misses))
print("Distinct decoded words found in corpus: {} misses: {}".format(
matches, misses))
if self.verbose > 1:
print("decoded words missing from corpus:", missing_words)
if self.verbose > 2:
Expand Down Expand Up @@ -115,18 +124,18 @@ def complete_map_using_char_counts(self):
of the mapping already guessed, it could he useful. Otherwise,
it is a last-chance, apriori best guess, because it uses
information from the corpus only, not from the cipher text.
'''
'''
if self.verbose > 0:
print("Guessing the rest of the cipher map based solely on letter counts.")
fdd = self.corpus_chars
idd = self.cipher_chars
forward_unmapped = (x for x in sorted(fdd, key=fdd.__getitem__,
reverse=True) if self.forward_map[x] == 0)
reverse=True) if self.forward_map[x] == 0)
inverse_unmapped = (x for x in sorted(fdd, key=idd.__getitem__,
reverse=True) if self.inverse_map[x] == 0)
reverse=True) if self.inverse_map[x] == 0)
for corpus_char, cipher_char in zip(forward_unmapped, inverse_unmapped):
self.assign(corpus_char, cipher_char)

def find_a_and_i(self):
'''Try to find the word "I" as the most common capitalized
single-letter word, and "a" as the most common lowercase
Expand Down Expand Up @@ -185,43 +194,49 @@ def find_words_from_ciphers(self):
The highest score wins. (That is, the decision is immediate, not
defered to accumulate multiple scoring passes or backpropagating votes.'''
num_words = len(self.corpus_words)
corpus = self.corpus_words.most_common(num_words) # Just try them all
inverse_pq = [] # priority = [num_unknown (updated on pop), -count, length]
corpus = self.corpus_words.most_common(num_words) # Just try them all
# priority = [num_unknown (updated on pop), -count, length]
inverse_pq = []
for ciph, count in self.cipher_words.items():
entry = [self.number_of_unknowns(ciph), -count, len(ciph), ciph]
heapq.heappush(inverse_pq, entry)

sentinel = '' # terminate the loop when the sentinel is seen a second time
# terminate the loop when the sentinel is seen a second time
sentinel = ''
while inverse_pq:
num_unk, neg_count, length, ciph = heapq.heappop(inverse_pq)
if num_unk == 0:
continue
num_unk, idx_unk = self.num_idx_unknown(ciph) # update (unknowns can become known)
num_unk, idx_unk = self.num_idx_unknown(
ciph) # update (unknowns can become known)
if num_unk == 1:
self.inverse_match_1_unknown(ciph, length, idx_unk, corpus)

elif num_unk > 1:
if ciph == sentinel:
print('Breaking from queue at: ', ciph, num_unk, -neg_count)
print('Breaking from queue at: ',
ciph, num_unk, -neg_count)
break
elif not sentinel:
# Set the sentinel and give each item still in the queue
# a chance to update its unknowns. Some may change to 1
# and get matched. Quit when the sentinel comes back to
# the front.
sentinel = ciph
print('Repush entry [', ciph, num_unk, -neg_count, '] to end of the queue')
print('Repush entry [', ciph, num_unk, -
neg_count, '] to end of the queue')
heapq.heappush(inverse_pq, [1000, 0, length, ciph])
elif self.verbose > 3:
print('\tAlready deciphered: ', num_unk, -neg_count, ciph
, self.decipher_word(ciph))
print('\tAlready deciphered: ', num_unk, -
neg_count, ciph, self.decipher_word(ciph))

def inverse_match_1_unknown(self, ciph, length, idx_unknown, corpus):
'''Try to match one cipher word with a single unknown against all
corpus words of same length. Accept the match that maximaly
improves the total score (if there is any such a match).'''
if self.verbose > 3:
print('Trying to match cipher word {} at index {}'.format(ciph, idx_unknown))
print('Trying to match cipher word {} at index {}'.format(
ciph, idx_unknown))
self.inverse_score = self.score_inverse_map()
ciph_char = ciph[idx_unknown]
max_score = 0
Expand All @@ -235,22 +250,29 @@ def inverse_match_1_unknown(self, ciph, length, idx_unknown, corpus):
continue # skip over the single unknown
if word[idx] != deciphered[idx]:
break # break on the first known mismatch
else: # all known chars matched, hole excluded
# Compute the total score that would result from accepting this mapping
# all known chars matched, hole excluded
else:
# Compute the total score that would result from accepting
# this mapping
word_char = word[idx_unknown]
self.inverse_map[ciph_char] = word_char # create temporary inverse mapping
try_score = self.score_inverse_map() # compute score with this mapping
self.inverse_map[ciph_char] = 0 # delete temporary inverse mapping
# create temporary inverse mapping
self.inverse_map[ciph_char] = word_char
# compute score with this mapping
try_score = self.score_inverse_map()
# delete temporary inverse mapping
self.inverse_map[ciph_char] = 0
if max_score < try_score:
max_score = try_score
max_word = word

if max_score > self.inverse_score:
self.update_mapping_on_better_score(ciph, idx_unknown, max_word, max_score)
self.update_mapping_on_better_score(
ciph, idx_unknown, max_word, max_score)

def update_mapping_on_better_score(self, ciph, idx_unknown, max_word, max_score):
'''Update forward and inverse maps and, if verbose > 0, show why.
For now this method assumes inverse-map scoring, but could be generalized
For now this method assumes inverse-map scoring,
but could be generalized
for forward-mapping, partial-word matches (stemming), and so forth.
This method exists mainly to provide a trace of the solver's progress.'''
ciph_char = ciph[idx_unknown]
Expand All @@ -273,11 +295,15 @@ def update_mapping_on_better_score(self, ciph, idx_unknown, max_word, max_score)
self.assign(max_char, ciph_char)

def score_inverse_map(self):
'''score based on totality of deciphered cipher words matching corpus words'''
'''
score based on totality of deciphered cipher words matching corpus
words
'''
score_total = 0
for ciph, ciph_count in self.cipher_words.items():
word = self.decipher_word(ciph)
word_count = 1 if self.corpus_words[word] else 0 # 0 if not in corpus
# 0 if not in corpus
word_count = 1 if self.corpus_words[word] else 0
score = word_count * ciph_count * len(ciph)
if self.verbose > 5:
print(" {:9}\t {} => {}".format(score, ciph, word))
Expand All @@ -288,14 +314,15 @@ def count_decoded_words_in_corpus(self):
'''returns three things: the hit and miss counts, and a list of
all the decoded cipher text words missing from the corpus:
(1) hits: the number of distinct encoded words that, decoded with the
current best guess at the cipher key, match some word found in the corpus, and
current best guess at the cipher key, match some word found in the
corpus, and
(2) misses: the number that do not.
Note that the set of 'words' may include such strings as "t" and
"ll", which result from splitting "don't" and "you'll" on the
apostrophe. No assumption is made of proper grammer or orthography
apostrophe. No assumption is made of proper grammer or orthography
(3) missing words: a list containing all decoded cipher text
words not found in the corpus, in lexicographically sorted order
'''
'''
num_ciphers = len(self.cipher_len_1) + len(self.cipher_words)
num_matches = 0
missing = []
Expand All @@ -314,7 +341,9 @@ def count_decoded_words_in_corpus(self):
return num_matches, num_ciphers - num_matches, sorted(missing)

def number_of_unknowns(self, ciph):
'''returns the number of unknown cipher characters in the string ciph'''
'''
returns the number of unknown cipher characters in the string ciph
'''
return sum(map(lambda x: self.inverse_map[x] == 0, ciph))

def num_idx_unknown(self, ciph):
Expand Down Expand Up @@ -376,7 +405,8 @@ def print_forward_map(self, outfile=sys.stdout):
print(word_char, "->", ciph_char if ciph_char else ' ', file=outfile)

def print_deciphered_lines(self, outfile=sys.stdout):
'''Print the decoded contents of the original cipher file to the console
'''Print the decoded contents of the original cipher file to the
console
(default) or a file'''
for line in self.cipher_lines:
text = self.decipher_text(line)
Expand All @@ -392,30 +422,36 @@ def write_forward_cipher_key(self, path):
out.close()

def write_deciphered_text(self, path):
'''Write the decoded contents of the original cipher file into a new file'''
'''
Write the decoded contents of the original cipher file into a new file
'''
with open(path, 'w') as out:
self.print_deciphered_lines(out)
out.close()


def uprint(*objects, sep=' ', end='\n', outfile=sys.stdout):
'''Prints non-ASCII Unicode (UTF-8) characters in a safe (but possibly
ugly) way even in a Windows command terminal. Unicode-enabled terminals
such as on Mac or KDE have no problem, nor do most IDE's, but calling
Python's built-in print to print such characters (e.g., an em-dash)
from a Windows cmd or Powershell terminal causes errors such as:
UnicodeEncodeError: 'charmap' codec can't encode characters in position 32-33:
UnicodeEncodeError: 'charmap' codec can't encode characters in position
32-33:
character maps to <undefined> '''
enc = outfile.encoding
if enc == 'UTF-8':
print(*objects, sep=sep, end=end, file=outfile)
else:
enc_dec = lambda obj: str(obj).encode(enc, errors='backslashreplace').decode(enc)
def enc_dec(obj): return str(obj).encode(
enc, errors='backslashreplace').decode(enc)
print(*map(enc_dec, objects), sep=sep, end=end, file=outfile)


def char_range_inclusive(first, last, step=1):
'''ranges from specified first to last character, inclusive, in
any character set, depending only on ord()'''
for char in range(ord(first), ord(last)+1, step):
for char in range(ord(first), ord(last) + 1, step):
yield chr(char)


Expand All @@ -427,8 +463,12 @@ def read_file_lines(path):
lines.append(line.rstrip())
return lines


def count_words(path):
'''Returns a Counter that has counted all ASCII-only words found in a text file.'''
'''
Returns a Counter that has counted all ASCII-only words found in a text
file.
'''
rgx_match = re.compile(r"[A-Za-z]+")
counter = Counter()
with open(path, 'r') as text:
Expand All @@ -438,10 +478,14 @@ def count_words(path):
counter.update(words)
return counter


def word_counts_short_and_long(path, max_short_len):
'''Returns two Counters containing all the ASCII-only words found in a text file.
'''
Returns two Counters containing all the ASCII-only words
found in a text file.
The first counter counts only words up to length max_short_len, as-is.
The second counter contains all the longer words, but lowercased.'''
The second counter contains all the longer words, but lowercased.
'''
rgx_match = re.compile(r"[A-Za-z]+")
short_counter = Counter()
other_counter = Counter()
Expand All @@ -459,6 +503,7 @@ def word_counts_short_and_long(path, max_short_len):
other_counter.update(other)
return short_counter, other_counter


def count_chars_from_words(word_counter):
'''Count chars from all words times their counts'''
char_counter = Counter()
Expand All @@ -467,18 +512,23 @@ def count_chars_from_words(word_counter):
char_counter.update(item[0])
return char_counter


def solve_simple_substition_cipher(cipher_file, corpus_file, verbose):
'''Given a file of ordinary English sentences encoded using a simple
'''
Given a file of ordinary English sentences encoded using a simple
substitution cipher, and a corpus of English text expected to contain
most of the words in the encoded text, decipher the encoded file.
Uses the SubCipher class.
'''
subs = SubCipher(cipher_file, corpus_file, verbose)
subs.solve()


def main():
'''Get file names for cipher and corpus texts and call
solve_simple_substition_cipher.'''
'''
Get file names for cipher and corpus texts and call
solve_simple_substition_cipher.
'''

# simple, inflexible arg parsing:
argc = len(sys.argv)
Expand Down