From ae26c8960a35a1102a623e4c8e11929340b0ebe8 Mon Sep 17 00:00:00 2001 From: Roberto Nobrega Date: Thu, 12 Dec 2024 10:23:34 -0300 Subject: [PATCH] refactor!: merge lossless source coding encoders/decoders into code classes - Removed `FixedToVariableEncoder`, `FixedToVariableDecoder`, `VariableToFixedEncoder`, `VariableToFixedDecoder`. Instead of `encoder = FixedToVariableEncoder(code); output = encoder(input)`, use `output = code.encode(input)`. --- site/toc.yaml | 4 -- .../_lossless_coding/FixedToVariableCode.py | 47 +++++++++++++++++- .../FixedToVariableDecoder.py | 48 ------------------ .../FixedToVariableEncoder.py | 38 -------------- .../_lossless_coding/VariableToFixedCode.py | 47 +++++++++++++++++- .../VariableToFixedDecoder.py | 38 -------------- .../VariableToFixedEncoder.py | 49 ------------------- src/komm/_lossless_coding/__init__.py | 8 --- .../test_fixed_to_variable_code.py | 12 ++--- .../test_variable_to_fixed_code.py | 6 +-- 10 files changed, 98 insertions(+), 199 deletions(-) delete mode 100644 src/komm/_lossless_coding/FixedToVariableDecoder.py delete mode 100644 src/komm/_lossless_coding/FixedToVariableEncoder.py delete mode 100644 src/komm/_lossless_coding/VariableToFixedDecoder.py delete mode 100644 src/komm/_lossless_coding/VariableToFixedEncoder.py diff --git a/site/toc.yaml b/site/toc.yaml index 02612761..a0222ce9 100644 --- a/site/toc.yaml +++ b/site/toc.yaml @@ -74,12 +74,8 @@ Source coding: Lossless coding: - FixedToVariableCode - HuffmanCode - - FixedToVariableEncoder - - FixedToVariableDecoder - VariableToFixedCode - TunstallCode - - VariableToFixedEncoder - - VariableToFixedDecoder Quantization: - ScalarQuantizer - LloydMaxQuantizer diff --git a/src/komm/_lossless_coding/FixedToVariableCode.py b/src/komm/_lossless_coding/FixedToVariableCode.py index f5373cfc..420ba206 100644 --- a/src/komm/_lossless_coding/FixedToVariableCode.py +++ b/src/komm/_lossless_coding/FixedToVariableCode.py @@ -6,7 +6,7 @@ from typing_extensions import Self from .._util.information_theory import PMF -from .util import Word, is_prefix_free, is_uniquely_decodable +from .util import Word, is_prefix_free, is_uniquely_decodable, parse_prefix_free @frozen @@ -202,3 +202,48 @@ def rate(self, pmf: npt.ArrayLike) -> float: probabilities = [np.prod(ps) for ps in it.product(pmf, repeat=k)] lengths = [len(word) for word in self.codewords] return np.dot(lengths, probabilities) / k + + def encode(self, source_symbols: npt.ArrayLike) -> npt.NDArray[np.integer]: + r""" + Encodes a sequence of source symbols using the code. + + Parameters: + source_symbols: The sequence of symbols to be encoded. Must be a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. + + Returns: + The sequence of encoded symbols. It is a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. + + Examples: + >>> code = komm.FixedToVariableCode.from_codewords(3, [(0,), (1,0), (1,1)]) + >>> code.encode([1, 0, 1, 0, 2, 0]) + array([1, 0, 0, 1, 0, 0, 1, 1, 0]) + """ + source_symbols = np.asarray(source_symbols) + k, enc = self.source_block_size, self.enc_mapping + return np.concatenate([enc[tuple(s)] for s in source_symbols.reshape(-1, k)]) + + def decode(self, target_symbols: npt.ArrayLike) -> npt.NDArray[np.integer]: + r""" + Decodes a sequence of target symbols using the code. Only works if the code is prefix-free. + + Parameters: + target_symbols: The sequence of symbols to be decoded. Must be a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. + + Returns: + output: The sequence of decoded symbols. It is a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. + + Examples: + >>> code = komm.FixedToVariableCode.from_codewords(3, [(0,), (1,0), (1,1)]) + >>> code.decode([1, 0, 0, 1, 0, 0, 1, 1, 0]) + array([1, 0, 1, 0, 2, 0]) + + >>> code = komm.FixedToVariableCode.from_codewords(2, [(0,), (1,0), (1,1), (1,1,0)]) + >>> code.decode([1, 0, 0, 1, 0, 0, 1, 1, 0]) + Traceback (most recent call last): + ... + ValueError: code is not prefix-free + """ + if not self.is_prefix_free(): + raise ValueError("code is not prefix-free") + target_symbols = np.asarray(target_symbols) + return parse_prefix_free(target_symbols, self.inv_enc_mapping) diff --git a/src/komm/_lossless_coding/FixedToVariableDecoder.py b/src/komm/_lossless_coding/FixedToVariableDecoder.py deleted file mode 100644 index 65b70364..00000000 --- a/src/komm/_lossless_coding/FixedToVariableDecoder.py +++ /dev/null @@ -1,48 +0,0 @@ -import numpy as np -import numpy.typing as npt -from attrs import frozen - -from .FixedToVariableCode import FixedToVariableCode -from .util import parse_prefix_free - - -@frozen -class FixedToVariableDecoder: - r""" - Decoder for prefix-free [fixed-to-variable length codes](/ref/FixedToVariableCode). - - Attributes: - code: The code to be considered, which must be a prefix-free code (that is, no codeword is a prefix of another codeword). - - :::komm.FixedToVariableDecoder.FixedToVariableDecoder.__call__ - """ - - code: FixedToVariableCode - - def __attrs_post_init__(self) -> None: - if not self.code.is_prefix_free(): - raise ValueError("code is not prefix-free") - - def __call__(self, input: npt.ArrayLike) -> npt.NDArray[np.integer]: - r""" - Parameters: Input: - input: The sequence of symbols to be decoded. Must be a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. - - Returns: Output: - output: The sequence of decoded symbols. It is a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. - - Examples: - >>> code = komm.FixedToVariableCode.from_codewords(3, [(0,), (1,0), (1,1)]) - >>> decoder = komm.FixedToVariableDecoder(code) - >>> decoder([1, 0, 0, 1, 0, 0, 1, 1, 0]) - array([1, 0, 1, 0, 2, 0]) - - >>> code = komm.FixedToVariableCode.from_codewords(2, [(0,), (1,0), (1,1), (1,1,0)]) - >>> decoder = komm.FixedToVariableDecoder(code) - Traceback (most recent call last): - ... - ValueError: code is not prefix-free - """ - input = np.asarray(input) - output = parse_prefix_free(input, self.code.inv_enc_mapping) - return output diff --git a/src/komm/_lossless_coding/FixedToVariableEncoder.py b/src/komm/_lossless_coding/FixedToVariableEncoder.py deleted file mode 100644 index 27f647ab..00000000 --- a/src/komm/_lossless_coding/FixedToVariableEncoder.py +++ /dev/null @@ -1,38 +0,0 @@ -import numpy as np -import numpy.typing as npt -from attrs import frozen - -from .FixedToVariableCode import FixedToVariableCode - - -@frozen -class FixedToVariableEncoder: - r""" - Encoder for [fixed-to-variable length codes](/ref/FixedToVariableCode). - - Attributes: - code: The code to be considered. - - :::komm.FixedToVariableEncoder.FixedToVariableEncoder.__call__ - """ - - code: FixedToVariableCode - - def __call__(self, input: npt.ArrayLike) -> npt.NDArray[np.integer]: - r""" - Parameters: Input: - input: The sequence of symbols to be encoded. Must be a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. - - Returns: Output: - output: The sequence of encoded symbols. It is a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. - - Examples: - >>> code = komm.FixedToVariableCode.from_codewords(3, [(0,), (1,0), (1,1)]) - >>> encoder = komm.FixedToVariableEncoder(code) - >>> encoder([1, 0, 1, 0, 2, 0]) - array([1, 0, 0, 1, 0, 0, 1, 1, 0]) - """ - input = np.asarray(input) - k, enc = self.code.source_block_size, self.code.enc_mapping - output = np.concatenate([enc[tuple(s)] for s in input.reshape(-1, k)]) - return output diff --git a/src/komm/_lossless_coding/VariableToFixedCode.py b/src/komm/_lossless_coding/VariableToFixedCode.py index 786b739f..59e36413 100644 --- a/src/komm/_lossless_coding/VariableToFixedCode.py +++ b/src/komm/_lossless_coding/VariableToFixedCode.py @@ -6,7 +6,7 @@ from typing_extensions import Self from .._util.information_theory import PMF -from .util import Word, is_prefix_free +from .util import Word, is_prefix_free, parse_prefix_free @frozen @@ -178,3 +178,48 @@ def rate(self, pmf: npt.ArrayLike) -> float: probabilities = [np.prod([pmf[x] for x in word]) for word in self.sourcewords] lengths = [len(word) for word in self.sourcewords] return self.target_block_size / np.dot(lengths, probabilities) + + def encode(self, source_symbols: npt.ArrayLike) -> npt.NDArray[np.integer]: + r""" + Encodes a sequence of source symbols using the code. + + Parameters: + source_symbols: The sequence of symbols to be encoded. Must be a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. + + Returns: + The sequence of encoded symbols. It is a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. + + Examples: + >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (1,)]) + >>> code.encode([0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0]) + array([0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0]) + + >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (0,)]) + >>> code.encode([0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0]) + Traceback (most recent call last): + ... + ValueError: code is not prefix-free + """ + if not self.is_prefix_free(): + raise ValueError("code is not prefix-free") + source_symbols = np.asarray(source_symbols) + return parse_prefix_free(source_symbols, self.inv_dec_mapping) + + def decode(self, target_symbols: npt.ArrayLike) -> npt.NDArray[np.integer]: + r""" + Decodes a sequence of target symbols using the code. + + Parameters: + target_symbols: The sequence of symbols to be decoded. Must be a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. + + Returns: + The sequence of decoded symbols. It is a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. + + Examples: + >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (1,)]) + >>> code.decode([0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0]) + array([0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0]) + """ + target_symbols = np.asarray(target_symbols) + n, dec = self.target_block_size, self.dec_mapping + return np.concatenate([dec[tuple(s)] for s in target_symbols.reshape(-1, n)]) diff --git a/src/komm/_lossless_coding/VariableToFixedDecoder.py b/src/komm/_lossless_coding/VariableToFixedDecoder.py deleted file mode 100644 index c3ee0a44..00000000 --- a/src/komm/_lossless_coding/VariableToFixedDecoder.py +++ /dev/null @@ -1,38 +0,0 @@ -import numpy as np -import numpy.typing as npt -from attrs import frozen - -from .VariableToFixedCode import VariableToFixedCode - - -@frozen -class VariableToFixedDecoder: - r""" - Decoder for [variable-to-fixed length codes](/ref/VariableToFixedCode). - - Attributes: - code: The code to be considered. - - :::komm.VariableToFixedDecoder.VariableToFixedDecoder.__call__ - """ - - code: VariableToFixedCode - - def __call__(self, input: npt.ArrayLike) -> npt.NDArray[np.integer]: - r""" - Parameters: Input: - input: The sequence of symbols to be decoded. Must be a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. - - Returns: Output: - output: The sequence of decoded symbols. It is a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. - - Examples: - >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (1,)]) - >>> decoder = komm.VariableToFixedDecoder(code) - >>> decoder([0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0]) - array([0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0]) - """ - input = np.asarray(input) - n, dec = self.code.target_block_size, self.code.dec_mapping - output = np.concatenate([dec[tuple(s)] for s in input.reshape(-1, n)]) - return output diff --git a/src/komm/_lossless_coding/VariableToFixedEncoder.py b/src/komm/_lossless_coding/VariableToFixedEncoder.py deleted file mode 100644 index 175b2fc4..00000000 --- a/src/komm/_lossless_coding/VariableToFixedEncoder.py +++ /dev/null @@ -1,49 +0,0 @@ -import numpy as np -import numpy.typing as npt -from attrs import frozen - -from .util import parse_prefix_free -from .VariableToFixedCode import VariableToFixedCode - - -@frozen -class VariableToFixedEncoder: - r""" - Encoder for prefix-free [variable-to-fixed length codes](/ref/VariableToFixedCode). - - Attributes: - code: The code to be considered. - - :::komm.VariableToFixedEncoder.VariableToFixedEncoder.__call__ - """ - - code: VariableToFixedCode - - def __attrs_post_init__(self) -> None: - if not self.code.is_prefix_free(): - raise ValueError("code is not prefix-free") - - def __call__(self, input: npt.ArrayLike) -> npt.NDArray[np.integer]: - r""" - - Parameters: Input: - input: The sequence of symbols to be encoded. Must be a 1D-array with elements in $[0:S)$, where $S$ is the source cardinality of the code. - - Returns: Output: - output: The sequence of encoded symbols. It is a 1D-array with elements in $[0:T)$, where $T$ is the target cardinality of the code. - - Examples: - >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (1,)]) - >>> encoder = komm.VariableToFixedEncoder(code) - >>> encoder([0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0]) - array([0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0]) - - >>> code = komm.VariableToFixedCode.from_sourcewords(2, [(0,0,0), (0,0,1), (0,1), (0,)]) - >>> encoder = komm.VariableToFixedEncoder(code) - Traceback (most recent call last): - ... - ValueError: code is not prefix-free - """ - input = np.asarray(input) - output = parse_prefix_free(input, self.code.inv_dec_mapping) - return output diff --git a/src/komm/_lossless_coding/__init__.py b/src/komm/_lossless_coding/__init__.py index 665b6534..02601bae 100644 --- a/src/komm/_lossless_coding/__init__.py +++ b/src/komm/_lossless_coding/__init__.py @@ -1,19 +1,11 @@ from .FixedToVariableCode import FixedToVariableCode -from .FixedToVariableDecoder import FixedToVariableDecoder -from .FixedToVariableEncoder import FixedToVariableEncoder from .HuffmanCode import HuffmanCode from .TunstallCode import TunstallCode from .VariableToFixedCode import VariableToFixedCode -from .VariableToFixedDecoder import VariableToFixedDecoder -from .VariableToFixedEncoder import VariableToFixedEncoder __all__ = [ "FixedToVariableCode", - "FixedToVariableDecoder", - "FixedToVariableEncoder", "HuffmanCode", "TunstallCode", "VariableToFixedCode", - "VariableToFixedDecoder", - "VariableToFixedEncoder", ] diff --git a/tests/lossless_coding/test_fixed_to_variable_code.py b/tests/lossless_coding/test_fixed_to_variable_code.py index a0d1acae..4d04b65d 100644 --- a/tests/lossless_coding/test_fixed_to_variable_code.py +++ b/tests/lossless_coding/test_fixed_to_variable_code.py @@ -165,7 +165,7 @@ def test_non_injective_enc_mapping(): def test_decoding_not_prefix_free(source_cardinality, codewords): code = komm.FixedToVariableCode.from_codewords(source_cardinality, codewords) with pytest.raises(ValueError): - komm.FixedToVariableDecoder(code) + code.decode([0, 0, 0, 0]) @pytest.mark.parametrize( @@ -228,11 +228,7 @@ def test_rate_invalid_pmf(pmf): ], ) def test_encoding_decoding(code_parameters, x, y): - source_cardinality, target_cardinality, source_block_size, codewords = ( - code_parameters.values() - ) + source_cardinality, _, _, codewords = code_parameters.values() code = komm.FixedToVariableCode.from_codewords(source_cardinality, codewords) - encoder = komm.FixedToVariableEncoder(code) - decoder = komm.FixedToVariableDecoder(code) - assert np.array_equal(encoder(x), y) - assert np.array_equal(decoder(y), x) + assert np.array_equal(code.encode(x), y) + assert np.array_equal(code.decode(y), x) diff --git a/tests/lossless_coding/test_variable_to_fixed_code.py b/tests/lossless_coding/test_variable_to_fixed_code.py index 2197fe86..54580a8f 100644 --- a/tests/lossless_coding/test_variable_to_fixed_code.py +++ b/tests/lossless_coding/test_variable_to_fixed_code.py @@ -106,9 +106,7 @@ def test_encoding_decoding(): code = komm.VariableToFixedCode.from_sourcewords( 2, [(0, 0, 0), (0, 0, 1), (0, 1), (1,)] ) - encoder = komm.VariableToFixedEncoder(code) - decoder = komm.VariableToFixedDecoder(code) x = [0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0] y = [0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0] - assert np.array_equal(encoder(x), y) - assert np.array_equal(decoder(y), x) + assert np.array_equal(code.encode(x), y) + assert np.array_equal(code.decode(y), x)