commit 4799d08f2f9704037ccefd200d1e306a9fee94cb Author: Sylvain Glaize Date: Sat Dec 2 19:24:17 2023 +0100 Initial commit with a working implementation. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2d62eb2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,241 @@ +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d39ad8b --- /dev/null +++ b/LICENSE @@ -0,0 +1,30 @@ +BSD 3-Clause License + +Copyright (c) 2023, Sylvain Glaize for the Python version +Copyright (c) 2021, Einar Saukas for the original version which is based upon +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..0a8efc4 --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +ZX0 compressor for Python +========================= + +ZX0 compressor for Python is an implementation of the [ZX0](https://github.com/einar-saukas/ZX0) compression algorithm +in Python. + +I needed a Python implementation of ZX0 for simplicity reasons on the build chain I use +for a project. Also as an exersice. + +The code is heavily based on the original ZX0 C code. +It is also not optimized for speed at all, contrary to the original ZX0 C code. + +Usage: + +``` +> python3 pyzx0.py -h +usage: pyzx0.py [-h] [-f] [-c] [-b] [-q] [-s SKIP] input_name [output_name] + +pyZX0 v2.2: Python port of ZX0 compressor by Einar Saukas for the same version. + +positional arguments: + input_name Input file + output_name Output file + +options: + -h, --help show this help message and exit + -f Force overwrite of output file + -c Classic file format (v1.*) + -b Compress backwards + -q Quick non-optimal compression + -s SKIP Skip first N bytes of input file +``` diff --git a/compress.py b/compress.py new file mode 100644 index 0000000..0da6994 --- /dev/null +++ b/compress.py @@ -0,0 +1,130 @@ +from optimize import INITIAL_OFFSET, optimize + +MAX_OFFSET_ZX0 = 32640 +MAX_OFFSET_ZX7 = 2176 + + +class CompressStream: + def __init__(self, optimal, input_size, skip, backwards_mode): + output_size = (optimal.bits + 25) // 8 + self.output_data = bytearray(output_size) + + self.backwards_mode = backwards_mode + self.input_index = skip + self.output_index = 0 + + self.diff = output_size - input_size + skip + self.bit_mask = 0 + self.bit_index = 0 + self.backtrack = True + + self.delta = 0 + + def read_bytes(self, n): + self.input_index += n + self.diff += n + if self.delta < self.diff: + self.delta = self.diff + + def write_byte(self, value): + self.output_data[self.output_index] = value + self.output_index += 1 + self.diff -= 1 + + def write_bit(self, value): + if self.backtrack: + if value: + self.output_data[self.output_index - 1] |= 1 + self.backtrack = False + else: + if not self.bit_mask: + self.bit_mask = 128 + self.bit_index = self.output_index + self.write_byte(0) + if value: + self.output_data[self.bit_index] |= self.bit_mask + self.bit_mask >>= 1 + + def write_interlaced_elias_gamma(self, value, invert_mode): + i = 2 + while i <= value: + i <<= 1 + i >>= 1 + while i > 1: + i >>= 1 + self.write_bit(self.backwards_mode) + self.write_bit(not (value & i) if invert_mode else (value & i)) + self.write_bit(not self.backwards_mode) + + def set_backtrack(self): + self.backtrack = True + + +def reverse_chain(optimal): + previous_block = None + while optimal: + next_block = optimal.chain + optimal.chain = previous_block + previous_block = optimal + optimal = next_block + return previous_block + + +def compress(optimal, input_data, skip, backwards_mode, invert_mode): + # Reverse the chain + prev = reverse_chain(optimal) + + stream = CompressStream(optimal, len(input_data), skip, backwards_mode) + + last_offset = INITIAL_OFFSET + + optimal = prev.chain # Skip the fake block + while optimal: + length = optimal.index - prev.index + + if optimal.offset == 0: + stream.write_bit(0) # Literal indicator + stream.write_interlaced_elias_gamma(length, False) # Length + for i in range(length): # Copy literal values + stream.write_byte(input_data[stream.input_index]) + stream.read_bytes(1) + elif optimal.offset == last_offset: + stream.write_bit(0) # Copy from last offset + stream.write_interlaced_elias_gamma(length, False) # Length + stream.read_bytes(length) # Advance the input index without writing on the output + else: + optimal_offset = optimal.offset - 1 + + stream.write_bit(1) # Copy from a new offset + stream.write_interlaced_elias_gamma(optimal_offset // 128 + 1, invert_mode) # MSB + if backwards_mode: + stream.write_byte((optimal_offset % 128) << 1) # LSB (backwards) + else: + stream.write_byte((127 - optimal_offset % 128) << 1) # LSB + + # Copy length bytes from the offset + stream.set_backtrack() # To use the last bit of the previous byte + stream.write_interlaced_elias_gamma(length - 1, False) + stream.read_bytes(length) + last_offset = optimal.offset + + prev = optimal + optimal = optimal.chain + + stream.write_bit(1) + stream.write_interlaced_elias_gamma(256, invert_mode) + + return stream.output_data, stream.delta + + +def compress_data(input_data, skip, backwards_mode, classic_mode, quick_mode): + if backwards_mode: + input_data = input_data[::-1] + + optimized_data = optimize(input_data, skip, MAX_OFFSET_ZX7 if quick_mode else MAX_OFFSET_ZX0) + output_data, delta = compress(optimized_data, input_data, skip, backwards_mode, + not classic_mode and not backwards_mode) + if backwards_mode: + output_data = output_data[::-1] + + return output_data, delta diff --git a/optimize.py b/optimize.py new file mode 100644 index 0000000..6053812 --- /dev/null +++ b/optimize.py @@ -0,0 +1,104 @@ +from typing import List, Optional + + +class Block: + def __init__(self, bits=None, index=None, offset=None, chain=None): + self.chain = chain + self.bits = bits + self.index = index + self.offset = offset + + +INITIAL_OFFSET = 1 +MAX_SCALE = 50 + + +def offset_ceiling(index, offset_limit): + return offset_limit if index > offset_limit else (INITIAL_OFFSET if index < INITIAL_OFFSET else index) + + +def elias_gamma_needed_bits(value): + bits = 1 + while value > 1: + value >>= 1 + bits += 2 + return bits + + +def optimize(input_data, skip, offset_limit): + # The algorithm has a floating window of size window_size describing the previous chain of matches + input_size = len(input_data) + window_size = offset_ceiling(input_size - 1, offset_limit) + 1 + + last_literal: List[Optional[Block]] = [None] * window_size + last_match: List[Optional[Block]] = [None] * window_size + match_length = [0] * window_size + + # The algorithm is looking for the best match for each index of the input data + optimal = [None] * input_size + best_length = [0] * input_size + + if input_size > 2: + best_length[2] = 2 + + # Kickstart the algorithm by assigning a fake block + last_match[INITIAL_OFFSET] = Block(-1, skip - 1, INITIAL_OFFSET, None) + + # The algorithm is checking for the best match for each index of the input data (skipping the skip part) + for index in range(skip, input_size): + best_length_size = 2 # It's useless to check for a match of length 1 + max_offset = offset_ceiling(index, offset_limit) + + for offset in range(1, max_offset + 1): + # Checking for a match in the previous part of the input data, backwards + if index != skip and index >= offset and input_data[index] == input_data[index - offset]: + current_literal = last_literal[offset] + if current_literal is not None: + length = index - current_literal.index + bits = current_literal.bits + 1 + elias_gamma_needed_bits(length) + + # Chain the current match to the previous literal + last_match[offset] = Block(bits, index, offset, current_literal) + + # Update the best match + if not optimal[index] or optimal[index].bits > bits: + optimal[index] = last_match[offset] + + match_length[offset] += 1 + + if match_length[offset] > 1: + if best_length_size < match_length[offset]: + bits = (optimal[index - best_length[best_length_size]].bits + + elias_gamma_needed_bits(best_length[best_length_size] - 1)) + + while True: + best_length_size += 1 + bits2 = (optimal[index - best_length_size].bits + + elias_gamma_needed_bits(best_length_size - 1)) + if bits2 <= bits: + best_length[best_length_size] = best_length_size + bits = bits2 + else: + best_length[best_length_size] = best_length[best_length_size - 1] + + if best_length_size >= match_length[offset]: + break + + length = best_length[match_length[offset]] + bits = (optimal[index - length].bits + 8 + + elias_gamma_needed_bits((offset - 1) // 128 + 1) + + elias_gamma_needed_bits(length - 1)) + if not last_match[offset] or last_match[offset].index != index or last_match[offset].bits > bits: + last_match[offset] = Block(bits, index, offset, optimal[index - length]) + if not optimal[index] or optimal[index].bits > bits: + optimal[index] = last_match[offset] + else: + match_length[offset] = 0 # Resetting the match length + if last_match[offset]: + length = index - last_match[offset].index + bits = last_match[offset].bits + 1 + elias_gamma_needed_bits(length) + length * 8 + last_literal[offset] = Block(bits, index, 0, last_match[offset]) + if not optimal[index] or optimal[index].bits > bits: + optimal[index] = last_literal[offset] + + return optimal[input_size - 1] diff --git a/pyzx0.py b/pyzx0.py new file mode 100644 index 0000000..cef5f68 --- /dev/null +++ b/pyzx0.py @@ -0,0 +1,88 @@ +import argparse +import os + +from compress import compress_data + + +class ApplicationError(Exception): + pass + + +def read_input_file(input_name, skip): + try: + with open(input_name, "rb") as ifp: + # determine input size + ifp.seek(0, os.SEEK_END) + input_size = ifp.tell() + ifp.seek(0, os.SEEK_SET) + + if input_size == 0: + raise ApplicationError("Empty input file") + + if skip >= input_size: + raise ApplicationError("Skip value exceeds input file size") + + input_data = bytearray(input_size) + read_count = ifp.readinto(input_data) + + if read_count != input_size: + raise ApplicationError("Cannot read input file") + + except FileNotFoundError: + print(f"Error: Cannot access input file {input_name}") + exit(1) + except ApplicationError as e: + print(f"Error: {e}") + exit(1) + return input_data + + +def write_output_file(output_name, output_data): + with open(output_name, "wb") as ofp: + ofp.write(output_data) + ofp.close() + + +def write_summary(backwards_mode, delta, input_data, output_data, skip): + text_backwards = " backwards" if backwards_mode else "" + initial_size = len(input_data) - skip + output_size = len(output_data) + print( + f"File compressed{text_backwards} from {initial_size} to {output_size} bytes! (delta {delta})") + + +def main(): + parser = argparse.ArgumentParser( + description='pyZX0 v2.2: Python port of ZX0 compressor by Einar Saukas for the same version.') + parser.add_argument('-f', action='store_true', help='Force overwrite of output file', dest='forced_mode') + parser.add_argument('-c', action='store_true', help='Classic file format (v1.*)', dest='classic_mode') + parser.add_argument('-b', action='store_true', help='Compress backwards', dest='backwards_mode') + parser.add_argument('-q', action='store_true', help='Quick non-optimal compression', dest='quick_mode') + parser.add_argument('-s', type=int, help='Skip first N bytes of input file', dest='skip') + parser.add_argument('input_name', type=str, help='Input file') + parser.add_argument('output_name', type=str, nargs='?', help='Output file') + + args = parser.parse_args() + + forced_mode = args.forced_mode + classic_mode = args.classic_mode + backwards_mode = args.backwards_mode + quick_mode = args.quick_mode + skip = args.skip if args.skip else 0 + output_name = args.output_name if args.output_name else args.input_name + ".zx0" + + input_data = read_input_file(args.input_name, skip) + + if not forced_mode and os.path.exists(output_name): + raise ApplicationError(f"Already existing output file {output_name}") + + output_data, delta = compress_data(input_data, skip, backwards_mode, classic_mode, quick_mode) + + write_output_file(output_name, output_data) + write_summary(backwards_mode, delta, input_data, output_data, skip) + + return 0 + + +if __name__ == "__main__": + main() diff --git a/test_compress.py b/test_compress.py new file mode 100644 index 0000000..ff788aa --- /dev/null +++ b/test_compress.py @@ -0,0 +1,23 @@ +import unittest + +from compress import compress_data + + +class TestCompress(unittest.TestCase): + def test_compress_abcdef(self): + input_data = bytearray(b"abcdef") + output_data, delta = compress_data(input_data, skip=0, + backwards_mode=False, classic_mode=False, quick_mode=False) + + expected_output_data = bytearray(b'MabcdefUV') + self.assertEqual(expected_output_data, output_data) + self.assertEqual(2, delta) + + def test_compress_abcabc(self): + input_data = bytearray(b"abcabc") + output_data, delta = compress_data(input_data, skip=0, + backwards_mode=False, classic_mode=False, quick_mode=False) + + expected_output_data = bytearray(b'{abc\xfaUU\x80') + self.assertEqual(expected_output_data, output_data) + self.assertEqual(3, delta) diff --git a/test_optimize.py b/test_optimize.py new file mode 100644 index 0000000..3901a8d --- /dev/null +++ b/test_optimize.py @@ -0,0 +1,24 @@ +import unittest + +from optimize import elias_gamma_needed_bits + + +class TestEliasGammaBits(unittest.TestCase): + def test_elias_gamma_bits(self): + self.assertEqual(elias_gamma_needed_bits(1), 1) + self.assertEqual(elias_gamma_needed_bits(2), 3) + self.assertEqual(elias_gamma_needed_bits(3), 3) + self.assertEqual(elias_gamma_needed_bits(4), 5) + self.assertEqual(elias_gamma_needed_bits(5), 5) + self.assertEqual(elias_gamma_needed_bits(6), 5) + self.assertEqual(elias_gamma_needed_bits(7), 5) + self.assertEqual(elias_gamma_needed_bits(8), 7) + self.assertEqual(elias_gamma_needed_bits(9), 7) + self.assertEqual(elias_gamma_needed_bits(10), 7) + self.assertEqual(elias_gamma_needed_bits(11), 7) + self.assertEqual(elias_gamma_needed_bits(12), 7) + self.assertEqual(elias_gamma_needed_bits(13), 7) + self.assertEqual(elias_gamma_needed_bits(14), 7) + self.assertEqual(elias_gamma_needed_bits(15), 7) + self.assertEqual(elias_gamma_needed_bits(16), 9) + self.assertEqual(elias_gamma_needed_bits(17), 9)