In this homework, we explore the Lempel-Ziv (LZ) probability assignment, which is based on the LZ78 compression scheme (Ziv and Lempel, 1978).
We will first implement the LZ probability assignment, and verify its mean universality. Then, we will plug-in the probability assignment to the arithmetic encoder to compress some real-world data.
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
Write a function
prob_LZ78_seq(string, char_to_index)
where
string: str, an input string of symbols ;char_to_index: dict, a mapping from characters to indices;dist_array, the vertical stack of LZ sequential probability assignments. That is, dist_array[i]$=q_{\text{LZ}}(\cdot|x^{i})$, for $i=0,\ldots,n$, if the input string is $x^n=(x_1,\ldots,x_{n})$For example, the function should output the following examples correctly:
>>> A = '10111111'
>>> prob_LZ78_seq(A, chars_to_dict('01'))
array([[ 0.5 , 0.5 ],
[ 0.33333333, 0.66666667],
[ 0.5 , 0.5 ],
[ 0.5 , 0.5 ],
[ 0.4 , 0.6 ],
[ 0.33333333, 0.66666667],
[ 0.5 , 0.5 ],
[ 0.33333333, 0.66666667],
[ 0.25 , 0.75 ]])
>>> B = '0010110111'
>>> prob_LZ78_seq(B, chars_to_dict('01'))
array([[ 0.5 , 0.5 ],
[ 0.66666667, 0.33333333],
[ 0.5 , 0.5 ],
[ 0.75 , 0.25 ],
[ 0.33333333, 0.66666667],
[ 0.5 , 0.5 ],
[ 0.8 , 0.2 ],
[ 0.25 , 0.75 ],
[ 0.33333333, 0.66666667],
[ 0.5 , 0.5 ],
[ 0.83333333, 0.16666667]])
>>> prob_LZ78_seq(B, chars_to_dict('012'))
array([[ 0.33333333, 0.33333333, 0.33333333],
[ 0.6 , 0.2 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.71428571, 0.14285714, 0.14285714],
[ 0.2 , 0.6 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.77777778, 0.11111111, 0.11111111],
[ 0.14285714, 0.71428571, 0.14285714],
[ 0.2 , 0.6 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.81818182, 0.09090909, 0.09090909]])
>>> C = '00121212102101210'
>>> prob_LZ78_seq(C, chars_to_dict('012'))
array([[ 0.33333333, 0.33333333, 0.33333333],
[ 0.6 , 0.2 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.71428571, 0.14285714, 0.14285714],
[ 0.55555556, 0.11111111, 0.33333333],
[ 0.45454545, 0.27272727, 0.27272727],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.38461538, 0.23076923, 0.38461538],
[ 0.2 , 0.6 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.33333333, 0.2 , 0.46666667],
[ 0.14285714, 0.71428571, 0.14285714],
[ 0.6 , 0.2 , 0.2 ],
[ 0.33333333, 0.33333333, 0.33333333],
[ 0.29411765, 0.17647059, 0.52941176],
[ 0.11111111, 0.77777778, 0.11111111],
[ 0.71428571, 0.14285714, 0.14285714],
[ 0.2 , 0.6 , 0.2 ]])
>>> D = 'AGTTTTCGTAACGTT'
>>> prob_LZ78_seq(D, chars_to_dict('AGTC'))
array([[ 0.25 , 0.25 , 0.25 , 0.25 ],
[ 0.57142857, 0.14285714, 0.14285714, 0.14285714],
[ 0.4 , 0.1 , 0.4 , 0.1 ],
[ 0.30769231, 0.07692308, 0.30769231, 0.30769231],
[ 0.25 , 0.25 , 0.25 , 0.25 ],
[ 0.25 , 0.0625 , 0.25 , 0.4375 ],
[ 0.14285714, 0.14285714, 0.14285714, 0.57142857],
[ 0.21052632, 0.05263158, 0.21052632, 0.52631579],
[ 0.25 , 0.25 , 0.25 , 0.25 ],
[ 0.18181818, 0.04545455, 0.31818182, 0.45454545],
[ 0.25 , 0.25 , 0.25 , 0.25 ],
[ 0.28 , 0.04 , 0.28 , 0.4 ],
[ 0.25 , 0.14285714, 0.25 , 0.35714286],
[ 0.14285714, 0.14285714, 0.14285714, 0.57142857],
[ 0.25 , 0.25 , 0.25 , 0.25 ],
[ 0.22580645, 0.12903226, 0.32258065, 0.32258065]])
Recall that if a sequence $x^n$ can be parsed as $y^cy_{\text{last}}$, then $$ q_{\text{LZ}}(x^n) =q_{\text{LZ}}(y^cy_{\text{last}}) =\prod_{i=1}^c q_{\text{LZ}}(y_i|y^{i-1}) \cdot q_{\text{LZ}}(y_{\text{last}}|y^c), % =\prod_{i=1}^c \frac{1}{(M-1)i+1} \cdot q_{\text{LZ}}(y_{\text{last}}|y^c), % = \frac{1}{(c+1)!} \quad(\text{for binary alphabet; }M=2) $$ where $$ q_{\text{LZ}}(y_i|y^{i-1}) = \frac{1}{(M-1)i+1}, $$ and $$ q_{\text{LZ}}(y_{\text{last}}|y^c) = \frac{\text{# of words with prefix }y_{\text{last}}\text{ in the word pool}}{\text{# of words in the word pool}}. $$
However, in this problem, you are required to find $q_{\text{LZ}}(\cdot|x^{j-1})$ for each $j=1,\ldots,n+1$.
# You may find this function useful
def chars_to_dict(chars):
chars = sorted(list(set(chars)))
char_to_index = {x: i for i, x in enumerate(chars)}
return char_to_index
class Node(object):
def __init__(self, data=None):
self.data = data
self.children = []
def add_child(self):
self.children.append(Node(self))
def add_children(self, num_child):
for _ in range(num_child):
self.add_child()
def prob_LZ78_seq(string, char_to_index):
"""
- string: str, a sequence to be parsed
- char_to_index: dict, a dictionary which maps a character to its index
"""
num_abc = len(char_to_index)
dist_array = np.zeros((len(string)+1, num_abc)) # i-th row is the sequential distribution
root = Node(None)
root.add_children(num_abc)
root.data = np.ones(num_abc)
node = root
for i in range(len(string)):
dist_array[i] = node.data
new_symbol_index = char_to_index[string[i]]
next_node = node.children[new_symbol_index]
node.data[new_symbol_index] += num_abc-1
if next_node.children: # if next_node is not a leaf
node = next_node
else: # if next_node is a leaf
next_node.add_children(num_abc)
next_node.data = np.ones(num_abc)
node = root
dist_array[-1] = node.data # q_{LZ}(x_{n+1}|x^n)
dist_array /= dist_array.sum(axis=1)[:, np.newaxis] # normalization
return dist_array
A = '10111111'
B = '0010110111'
C = '00121212102101210'
D = 'AGTTTTCGTAACGTT'
prob_LZ78_seq(A, chars_to_dict('01'))
prob_LZ78_seq(B, chars_to_dict('01'))
prob_LZ78_seq(B, chars_to_dict('012'))
prob_LZ78_seq(C, chars_to_dict('012'))
prob_LZ78_seq(D, chars_to_dict('AGTC'))
Based on (a), write a function
log2_prob_LZ78(string, char_to_index)
that outputs $\log_2 q_{\text{LZ}}(x^{n})$ given the input string $x^n$. For example,
>>> A = '10111111'
>>> log2_prob_LZ78(A, chars_to_dict('01'))
-7.4918530963296748
>>> B = '0010110111'
>>> log2_prob_LZ78(B, chars_to_dict('01'))
-6.9068905956085178
>>> log2_prob_LZ78(B, chars_to_dict('012'))
-9.8841705191084355
>>> C = '00121212102101210'
>>> log2_prob_LZ78(C, chars_to_dict('012'))
-22.716467197858318
>>> D = 'AGTTTTCGTAACGTT'
>>> log2_prob_LZ78(D, chars_to_dict('AGTC'))
-33.988292978999269
Be careful with underflow of small probabilities (or overflow).
def log2_prob_LZ78(string, char_to_index):
"""
- string: str, a sequence to be parsed
- char_to_index: dict, a dictionary which maps a character to its index
"""
dist_array = prob_LZ78_seq(string, char_to_index)
log2prob = np.sum([np.log2(dist_array[i, char_to_index[string[i]]]) for i in range(len(string))])
return log2prob
A, log2_prob_LZ78(A, chars_to_dict('01'))
B, log2_prob_LZ78(B, chars_to_dict('01'))
B, log2_prob_LZ78(B, chars_to_dict('012'))
C, log2_prob_LZ78(C, chars_to_dict('012'))
D, log2_prob_LZ78(D, chars_to_dict('AGTC'))
In this problem, we will verify the mean universality of the LZ probability distribution implemented in Problem 1. In the class we learned that $$ \lim_{n\to\infty}\mathbb{E}_{X^n\sim \mathsf{P}}\left[\log_2\frac{p(X^n)}{q_{\text{LZ}}(X^n)}\right]=0 $$ for any stationary ergodic measure $\mathsf{P}$. To verify this, we draw $(X_{1}(i),\ldots,X_{n}(i))\sim \mathsf{P}$ independently for $i=1,\ldots,N$ to obtain $$ \frac{1}{N}\sum_{i=1}^N \log_2\frac{p(X^n(i))}{q_{\text{LZ}}(X^n(i))}\approx \mathbb{E}_{X^n\sim \mathsf{P}}\left[\log\frac{p(X^n)}{q_{\text{LZ}}(X^n)}\right]. $$ For $N$ sufficiently large, the sample average on the (LHS) would be close to the expected regret on the (RHS). We will consider the following two simple probability model classes.
a) $\mathcal{P_a}$={a Bernoulli($p$) i.i.d. process $X_i\sim \text{Bern}(p)$: $p\in[0,1]$}
b) $\mathcal{P_b}$={a binary symmetric Markov chain $X^n$ of transition matrix $$ P=\begin{bmatrix} 1-p & p\\ p & 1-p \end{bmatrix}, $$ starting from the initial distribution $\pi=[1/2,1/2]$: $p\in[0,1]$}
For each model class, plot $\log_{10} n$ versus for $p \in \{0.02,0.04,\ldots,0.5\}$ the logarithm of $$ \frac{1}{n}\max_{p_i}\frac{1}{N}\sum_{i=1}^N \log_2\frac{p(X_{(i)}^n)}{q_{\text{LZ}}(X_{(i)}^n)} $$ to the base $10$ for $n\in \{10, 50, 100, 500, 1000, 5000, 10000\}$, while fixing $N=50$.
Plot the two curves for each model class in the same plot. For which model does the LZ probability give a faster convergence?
Hint: You may want to implement following functions first.
draw_one_sample_from(pmf) outputs a random number from {0,...,len(pmf)-1} according to the given pmf.
For example,
>>> draw_one_sample_from([0.8, 0.2])
0
generate_mc(n, P, pinit) outputs (x,log2_prob), where
x is a sequence of Markov chain of length n drawn according to the given transition matrix P starting from the initial distribution pinit, and log2_prob is the log probability of the sequence to the base 2.
For example,
>>> generate_mc(10, np.array([[0.3,0.7],[0.7,0.3]]))
(array([1, 0, 0, 1, 0, 0, 0, 0, 0, 1]), -11.743120662150064)
Note that you can generate an iid source using the function generate_mc.
def draw_one_sample_from(pmf):
pmf = np.array(pmf)
pmf = pmf / sum(pmf)
return np.random.choice(range(len(pmf)), p=pmf)
def generate_mc(n, P, pinit=None):
'''
- n: length of output sequence
- P: transition matrix
- pinit: initial distribution
'''
x = np.zeros(n, dtype=np.int)
log2_prob = 0
if pinit is None:
x[0] = np.random.randint(P.shape[0])
p0 = 1/P.shape[0]
else:
x[0] = draw_one_sample_from(pinit)
p0 = pinit[x[0]]
log2_prob += np.log2(p0)
for i in range((int(n)-1)):
x[i+1] = draw_one_sample_from(P[x[i], :])
log2_prob += np.log2(P[x[i],x[i+1]])
return x, log2_prob
def mean_redundancy(nlist, plist, N=50, model="iid"):
M = 2 # let's focus on the binary alphabet
red_array = np.zeros((len(plist), len(nlist)))
for i, p in enumerate(plist):
if model == "iid":
P = np.array([[1-p, p], [1-p, p]])
pinit = [1-p, p]
elif model == "bsmc":
P = np.array([[1-p, p], [p, 1-p]])
pinit = None
else:
raise ValueError("Input a valid model class either 'iid' or 'bsmc'")
for j, n in enumerate(nlist):
print('model={}, n={}, {}-th p={}'.format(model,n,i+1,p))
redundancy = 0
for _ in range(N):
x, log2prob = generate_mc(n, P, pinit)
xstr = ''.join([str(xi) for xi in x])
log2probLZ78 = log2_prob_LZ78(xstr, chars_to_dict([str(k) for k in range(M)]))
redundancy += log2prob - log2probLZ78
red_array[i,j] = redundancy/N/n
return red_array
nlist = [10, 50, 100, 500, 1000, 5000, 10000]
plist = np.arange(0.02, 0.51, 0.02)
%time red_array_iid = mean_redundancy(nlist, plist, model="iid")
%time red_array_bsmc = mean_redundancy(nlist, plist, model="bsmc")
plt.plot(np.log10(nlist), np.log10(red_array_iid.max(axis=0)), marker='x', label='iid')
plt.plot(np.log10(nlist), np.log10(red_array_bsmc.max(axis=0)), marker='x', label='bsmc')
plt.title('Worst expected redundancy for iid and bsmc')
plt.xlabel('$\log_{10}$(length of sequence $n$)')
plt.ylabel('$\log_{10}$(expected redundancy [bits])')
plt.legend(loc=1)
Since the bsmc model is definitely a larger class than the iid model, LZ learns iid much faster as one can expect!
In this problem, we will implement a universal compression scheme based on the LZ probability assignment. In the following, we provide a complete Python 3 code for a simple offline version of the arithmetic encoder. Later in this course, you will learn this coding scheme in detail. Briefly speaking, given a sequential probability model $q(\cdot|x^{i-1})$, the arithmetic encoder maps a string $x^n$ into a real number between 0 and 1 in a binary representation, and it only requires $\left\lceil\log_2 \frac{1}{q(x^n)}\right\rceil +1$ bits to encode $x^n$. The best advantage of the arithmetic coding is that it allows us to separate modeling and compression. Hence, by plugging-in any (universal) sequential probability model to the arithmetic encoder, we have a corresponding (universal) compression scheme!

For simplicity, the following version of the arithmetic encoder takes a stack of sequential distributions as inputs. Hence in this problem, to encode a string $x^n$ using the LZ probability assignment $q_{\text{LZ}}$, instead of providing a sequential probability assignment model, you first need to find a stack of sequential distributions by scanning the sequence once, and then pass it to the arithmetic encoder.
(NOTE. Our resulting compression scheme (LZ probability assignment + arithmetic encoder) is different from the original LZ78 compression algorithm. In the original algorithm, the code bits can be generated directly from the parsing.)
'''
**************************
**************************
*** ARITHMETIC ENCODER ***
**************************
**************************
'''
def arithmetic_enc_offline(string, char_to_index, pmfs, STATE_SIZE=32):
ZERO = 0
ONE = (1 << STATE_SIZE) - 1
ONE_FOURTH = 1 << (STATE_SIZE - 2)
HALF = ONE_FOURTH * 2
THREE_FOURTH = HALF + ONE_FOURTH
# Mask of STATE_SIZE ones, i.e., 111...111.
MASK = ONE
# Mask of the top bit at width STATE_SIZE, i.e., 100...000.
TOP_MASK = HALF
# Mask of the second highest bit at width STATE_SIZE, i.e., 010...000.
SECOND_MASK = TOP_MASK >> 1
low, high = ZERO, ONE
low_float, high_float = 0.0, 1.0
encoded_bits = ''
num_pending_bits = 0
for i, symbol in enumerate(string):
curr_cmf = np.hstack(([0], np.cumsum(pmfs[i])))
curr_idx = char_to_index[symbol]
p_prev_symb = high - low + 1 # range
high = int(low + p_prev_symb*curr_cmf[curr_idx+1] / curr_cmf[-1]) - 1
low = int(low + p_prev_symb*curr_cmf[curr_idx] / curr_cmf[-1])
# To check the validity of logic
p_prev_symb_float = high_float - low_float
high_float = low_float + p_prev_symb_float*curr_cmf[curr_idx+1] / curr_cmf[-1]
low_float = low_float + p_prev_symb_float*curr_cmf[curr_idx] / curr_cmf[-1]
assert ZERO <= low
assert low < high
assert high <= ONE
while 1:
if (low ^ high) & TOP_MASK == 0: # if MSBs are equal
bit = 0 if high & TOP_MASK == 0 else 1
encoded_bits += output_bit_plus_pending(bit, num_pending_bits)
num_pending_bits = 0
low = (low << 1) & MASK
high = ((high << 1) & MASK) | 1
assert ZERO <= low < high <= ONE, (ZERO<=low, high<=ONE, bin_in_bits(low,STATE_SIZE), bin_in_bits(high,STATE_SIZE))
elif (low & SECOND_MASK) != 0 and (high & SECOND_MASK) == 0: # if low=01... and high=10...
num_pending_bits += 1
low = ( (low ^ SECOND_MASK) << 1) & MASK # flip the second MSB and shift in 0 left
high = (((high ^ SECOND_MASK) << 1) | 1) & MASK # flip the second MSB and shift in 1 left
assert ZERO <= low < high <= ONE, (ZERO<=low, high<=ONE, bin_in_bits(low,STATE_SIZE), bin_in_bits(high,STATE_SIZE))
else:
break
# Terminate: adapted from David MacKay's code
if (HALF-low) > (high-HALF):
w = HALF-low
bit = 0
else:
w = high-HALF
bit = 1
encoded_bits += output_bit_plus_pending(bit, num_pending_bits)
num_pending_bits = 0
while w < HALF:
encoded_bits += str(1-bit)
w *= 2
return encoded_bits
def output_bit_plus_pending(bit, num_pending_bits):
assert bit in {0, 1}
return str(bit) + num_pending_bits * str(1-bit)
The following is a wrapper of the arithmetic encoder to compress a string using LZ probability that you can use in the following problems.
def compress_with_LZ_prob(string, char_to_index):
pmfs = prob_LZ78_seq(string, char_to_index)
return arithmetic_enc_offline(string, char_to_index, pmfs)
Generate a random sequence of length $n=10000$ from the binary symmetric Markov chain of parameter $p\in \{0.02, 0.04, \ldots, 0.48\}$, and compress the sequence using compress_with_LZ_prob.
Repeat the experiment $N=50$ times for each $p$ and take an average.
Can you compare this number with the fundamental limit of the lossless compression of this source?
Plot the fundamental limit of the compression and the average of compression ratios for each $p$ in the same plot.
n = 1000
N = 10
plist = np.arange(0.02, 0.5, 0.02)
compress_ratio_list = np.zeros(len(plist))
for i, p in enumerate(plist):
P = np.array([[1-p, p], [p, 1-p]])
print("For p={}".format(p))
for _ in range(N):
x, log2prob = generate_mc(n, P)
xstr = ''.join([str(xi) for xi in x])
encoded_bits = compress_with_LZ_prob(xstr, chars_to_dict('01'))
compress_ratio_list[i] += len(encoded_bits) / n
compress_ratio_list /= N
def bin_entropy(p):
assert 0 <= p <= 1
return -p*np.log2(p)-(1-p)*np.log2(1-p)
plt.plot(plist, compress_ratio_list*100, label="LZ+ari_enc", marker='x')
plt.plot(plist, [bin_entropy(p)*100 for p in plist], label="Entropy rate", marker='*')
plt.title('Compression of symmetric binary Markov chain ($p$)')
plt.xlabel('Transition probability $p$')
plt.ylabel('Compression ratio (%)')
plt.legend(loc=4)
The fundamental limit of the lossless compression is the entropy rate $\overline{H}$, and the entropy rate reduces to the binary entropy function $H(p)$ for the bsmc source. The plot shows that our universal compression algorithm gives a similar behavior compared to the binary entropy function as $p$ varies. If the sample size goes to infinity, the performance should converge to the entropy rate.
For part (b) and (c), find the compression ratio of the number of compressed bits of a sequence of length $n$ with respect to a uniform encoding which maps a character from an alphabet of size $M$ to a binary sequence of $n\log_2 M$ bits.
def compress(filename, char_to_index=None):
file = open(filename, 'r', encoding="utf8")
string = file.read()
file.close()
if char_to_index is None:
char_to_index = chars_to_dict(string)
encoded_bits = compress_with_LZ_prob(string, char_to_index)
compress_ratio = len(encoded_bits) / (len(string) * np.log2(len(char_to_index)))
print("The compression ratio is {}%.".format(compress_ratio*100))
In the data folder you can find CElegans_sample.txt that contains the first 2000 symbols from the Chromosome I of C. Elegans.
CE10.2bit from http://hgdownload.soe.ucsc.edu/goldenPath/ce10/bigZips/Compress the sequence and find the compression ratio.
%time compress('data/CElegans_sample.txt', chars_to_dict('AGTC'))
Indeed, we can observe that our genome sequence is not well modeled by a stationary ergodic process. This can be verified by considering the compression ratios as we increase the length of the sequence. If the sequence were indeed generated from a stationary ergodic process, as we increase the sample size $n$, the LZ probability should predict the probability more closely. However, the following experiment shows that the approximation gets worse as we increase the sample size!
filename = 'data/CElegans_sample.txt'
file = open(filename, 'r', encoding="utf8")
string = file.read()
string = string.upper()
file.close()
char_to_index = chars_to_dict('AGTC')
Nlist = range(100, len(string), 100)
compress_ratio_list = np.zeros(len(Nlist))
for i, N in enumerate(Nlist):
X = string[:N+1]
pmfs = prob_LZ78_seq(X, char_to_index)
encoded_bits = arithmetic_enc_offline(X, char_to_index, pmfs)
compress_ratio_list[i] = len(encoded_bits) / (len(X) * np.log2(len(char_to_index)))
plt.plot(Nlist, compress_ratio_list * 100, marker="*")
plt.xlabel('N')
plt.ylabel('compression ratio (%)');
In the data folder you can find sample_the_little_prince_xx.txt containing the entire text of "The Little Prince" both in French and English (translation).
The texts are preprcessed so that they only contain the blank (' '), and the English alphabets, and/or the French alphabets. Hence, for the English text, we consider an alphabet of size 27. For the French text, with additional 13 characters with accents, we consider an alphabet of size 40.
nums = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
en_abc = {' ', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'}
fr_letters = {'é',
'à', 'è', 'ù',
'â', 'ê', 'î', 'ô', 'û',
'ç',
'ë', 'ï', 'ü'}
def preprocess(filename, lang="EN"):
'''
The numbers should be taken care manually.
'''
file = open(filename, 'r')
string = file.read()
file.close()
alphabet = nums | en_abc
if lang == "FR":
alphabet |= fr_letters
string = string.lower()
for char in string:
if char not in nums | en_abc | fr_letters:
if char == "'":
string = string.replace(char, '')
print("Replaced {} with ''!".format(char))
else:
string = string.replace(char, ' ')
print("Replaced {} with ' '!".format(char))
for i in range(10):
string = string.replace(' ', ' ')
filename_list = ['data/sample_the_little_prince_en.txt',
'data/sample_the_little_prince_fr.txt']
alphabet_list = [en_abc, en_abc|fr_letters]
for filename, alphabet in zip(filename_list, alphabet_list):
print("For {}:".format(filename), end = '')
%time compress(filename, chars_to_dict(alphabet))
Which language is more efficient?
The results are as follows:
Hence, French seems to be more efficient than English, at least in writing "The Little Prince"! 😀