Initial commit with a working implementation.

This commit is contained in:
Sylvain Glaize 2023-12-02 19:24:17 +01:00
commit 4799d08f2f
8 changed files with 672 additions and 0 deletions

241
.gitignore vendored Normal file
View File

@ -0,0 +1,241 @@
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

30
LICENSE Normal file
View File

@ -0,0 +1,30 @@
BSD 3-Clause License
Copyright (c) 2023, Sylvain Glaize for the Python version
Copyright (c) 2021, Einar Saukas for the original version which is based upon
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

32
README.md Normal file
View File

@ -0,0 +1,32 @@
ZX0 compressor for Python
=========================
ZX0 compressor for Python is an implementation of the [ZX0](https://github.com/einar-saukas/ZX0) compression algorithm
in Python.
I needed a Python implementation of ZX0 for simplicity reasons on the build chain I use
for a project. Also as an exersice.
The code is heavily based on the original ZX0 C code.
It is also not optimized for speed at all, contrary to the original ZX0 C code.
Usage:
```
> python3 pyzx0.py -h
usage: pyzx0.py [-h] [-f] [-c] [-b] [-q] [-s SKIP] input_name [output_name]
pyZX0 v2.2: Python port of ZX0 compressor by Einar Saukas for the same version.
positional arguments:
input_name Input file
output_name Output file
options:
-h, --help show this help message and exit
-f Force overwrite of output file
-c Classic file format (v1.*)
-b Compress backwards
-q Quick non-optimal compression
-s SKIP Skip first N bytes of input file
```

130
compress.py Normal file
View File

@ -0,0 +1,130 @@
from optimize import INITIAL_OFFSET, optimize
MAX_OFFSET_ZX0 = 32640
MAX_OFFSET_ZX7 = 2176
class CompressStream:
def __init__(self, optimal, input_size, skip, backwards_mode):
output_size = (optimal.bits + 25) // 8
self.output_data = bytearray(output_size)
self.backwards_mode = backwards_mode
self.input_index = skip
self.output_index = 0
self.diff = output_size - input_size + skip
self.bit_mask = 0
self.bit_index = 0
self.backtrack = True
self.delta = 0
def read_bytes(self, n):
self.input_index += n
self.diff += n
if self.delta < self.diff:
self.delta = self.diff
def write_byte(self, value):
self.output_data[self.output_index] = value
self.output_index += 1
self.diff -= 1
def write_bit(self, value):
if self.backtrack:
if value:
self.output_data[self.output_index - 1] |= 1
self.backtrack = False
else:
if not self.bit_mask:
self.bit_mask = 128
self.bit_index = self.output_index
self.write_byte(0)
if value:
self.output_data[self.bit_index] |= self.bit_mask
self.bit_mask >>= 1
def write_interlaced_elias_gamma(self, value, invert_mode):
i = 2
while i <= value:
i <<= 1
i >>= 1
while i > 1:
i >>= 1
self.write_bit(self.backwards_mode)
self.write_bit(not (value & i) if invert_mode else (value & i))
self.write_bit(not self.backwards_mode)
def set_backtrack(self):
self.backtrack = True
def reverse_chain(optimal):
previous_block = None
while optimal:
next_block = optimal.chain
optimal.chain = previous_block
previous_block = optimal
optimal = next_block
return previous_block
def compress(optimal, input_data, skip, backwards_mode, invert_mode):
# Reverse the chain
prev = reverse_chain(optimal)
stream = CompressStream(optimal, len(input_data), skip, backwards_mode)
last_offset = INITIAL_OFFSET
optimal = prev.chain # Skip the fake block
while optimal:
length = optimal.index - prev.index
if optimal.offset == 0:
stream.write_bit(0) # Literal indicator
stream.write_interlaced_elias_gamma(length, False) # Length
for i in range(length): # Copy literal values
stream.write_byte(input_data[stream.input_index])
stream.read_bytes(1)
elif optimal.offset == last_offset:
stream.write_bit(0) # Copy from last offset
stream.write_interlaced_elias_gamma(length, False) # Length
stream.read_bytes(length) # Advance the input index without writing on the output
else:
optimal_offset = optimal.offset - 1
stream.write_bit(1) # Copy from a new offset
stream.write_interlaced_elias_gamma(optimal_offset // 128 + 1, invert_mode) # MSB
if backwards_mode:
stream.write_byte((optimal_offset % 128) << 1) # LSB (backwards)
else:
stream.write_byte((127 - optimal_offset % 128) << 1) # LSB
# Copy length bytes from the offset
stream.set_backtrack() # To use the last bit of the previous byte
stream.write_interlaced_elias_gamma(length - 1, False)
stream.read_bytes(length)
last_offset = optimal.offset
prev = optimal
optimal = optimal.chain
stream.write_bit(1)
stream.write_interlaced_elias_gamma(256, invert_mode)
return stream.output_data, stream.delta
def compress_data(input_data, skip, backwards_mode, classic_mode, quick_mode):
if backwards_mode:
input_data = input_data[::-1]
optimized_data = optimize(input_data, skip, MAX_OFFSET_ZX7 if quick_mode else MAX_OFFSET_ZX0)
output_data, delta = compress(optimized_data, input_data, skip, backwards_mode,
not classic_mode and not backwards_mode)
if backwards_mode:
output_data = output_data[::-1]
return output_data, delta

104
optimize.py Normal file
View File

@ -0,0 +1,104 @@
from typing import List, Optional
class Block:
def __init__(self, bits=None, index=None, offset=None, chain=None):
self.chain = chain
self.bits = bits
self.index = index
self.offset = offset
INITIAL_OFFSET = 1
MAX_SCALE = 50
def offset_ceiling(index, offset_limit):
return offset_limit if index > offset_limit else (INITIAL_OFFSET if index < INITIAL_OFFSET else index)
def elias_gamma_needed_bits(value):
bits = 1
while value > 1:
value >>= 1
bits += 2
return bits
def optimize(input_data, skip, offset_limit):
# The algorithm has a floating window of size window_size describing the previous chain of matches
input_size = len(input_data)
window_size = offset_ceiling(input_size - 1, offset_limit) + 1
last_literal: List[Optional[Block]] = [None] * window_size
last_match: List[Optional[Block]] = [None] * window_size
match_length = [0] * window_size
# The algorithm is looking for the best match for each index of the input data
optimal = [None] * input_size
best_length = [0] * input_size
if input_size > 2:
best_length[2] = 2
# Kickstart the algorithm by assigning a fake block
last_match[INITIAL_OFFSET] = Block(-1, skip - 1, INITIAL_OFFSET, None)
# The algorithm is checking for the best match for each index of the input data (skipping the skip part)
for index in range(skip, input_size):
best_length_size = 2 # It's useless to check for a match of length 1
max_offset = offset_ceiling(index, offset_limit)
for offset in range(1, max_offset + 1):
# Checking for a match in the previous part of the input data, backwards
if index != skip and index >= offset and input_data[index] == input_data[index - offset]:
current_literal = last_literal[offset]
if current_literal is not None:
length = index - current_literal.index
bits = current_literal.bits + 1 + elias_gamma_needed_bits(length)
# Chain the current match to the previous literal
last_match[offset] = Block(bits, index, offset, current_literal)
# Update the best match
if not optimal[index] or optimal[index].bits > bits:
optimal[index] = last_match[offset]
match_length[offset] += 1
if match_length[offset] > 1:
if best_length_size < match_length[offset]:
bits = (optimal[index - best_length[best_length_size]].bits +
elias_gamma_needed_bits(best_length[best_length_size] - 1))
while True:
best_length_size += 1
bits2 = (optimal[index - best_length_size].bits +
elias_gamma_needed_bits(best_length_size - 1))
if bits2 <= bits:
best_length[best_length_size] = best_length_size
bits = bits2
else:
best_length[best_length_size] = best_length[best_length_size - 1]
if best_length_size >= match_length[offset]:
break
length = best_length[match_length[offset]]
bits = (optimal[index - length].bits + 8 +
elias_gamma_needed_bits((offset - 1) // 128 + 1) +
elias_gamma_needed_bits(length - 1))
if not last_match[offset] or last_match[offset].index != index or last_match[offset].bits > bits:
last_match[offset] = Block(bits, index, offset, optimal[index - length])
if not optimal[index] or optimal[index].bits > bits:
optimal[index] = last_match[offset]
else:
match_length[offset] = 0 # Resetting the match length
if last_match[offset]:
length = index - last_match[offset].index
bits = last_match[offset].bits + 1 + elias_gamma_needed_bits(length) + length * 8
last_literal[offset] = Block(bits, index, 0, last_match[offset])
if not optimal[index] or optimal[index].bits > bits:
optimal[index] = last_literal[offset]
return optimal[input_size - 1]

88
pyzx0.py Normal file
View File

@ -0,0 +1,88 @@
import argparse
import os
from compress import compress_data
class ApplicationError(Exception):
pass
def read_input_file(input_name, skip):
try:
with open(input_name, "rb") as ifp:
# determine input size
ifp.seek(0, os.SEEK_END)
input_size = ifp.tell()
ifp.seek(0, os.SEEK_SET)
if input_size == 0:
raise ApplicationError("Empty input file")
if skip >= input_size:
raise ApplicationError("Skip value exceeds input file size")
input_data = bytearray(input_size)
read_count = ifp.readinto(input_data)
if read_count != input_size:
raise ApplicationError("Cannot read input file")
except FileNotFoundError:
print(f"Error: Cannot access input file {input_name}")
exit(1)
except ApplicationError as e:
print(f"Error: {e}")
exit(1)
return input_data
def write_output_file(output_name, output_data):
with open(output_name, "wb") as ofp:
ofp.write(output_data)
ofp.close()
def write_summary(backwards_mode, delta, input_data, output_data, skip):
text_backwards = " backwards" if backwards_mode else ""
initial_size = len(input_data) - skip
output_size = len(output_data)
print(
f"File compressed{text_backwards} from {initial_size} to {output_size} bytes! (delta {delta})")
def main():
parser = argparse.ArgumentParser(
description='pyZX0 v2.2: Python port of ZX0 compressor by Einar Saukas for the same version.')
parser.add_argument('-f', action='store_true', help='Force overwrite of output file', dest='forced_mode')
parser.add_argument('-c', action='store_true', help='Classic file format (v1.*)', dest='classic_mode')
parser.add_argument('-b', action='store_true', help='Compress backwards', dest='backwards_mode')
parser.add_argument('-q', action='store_true', help='Quick non-optimal compression', dest='quick_mode')
parser.add_argument('-s', type=int, help='Skip first N bytes of input file', dest='skip')
parser.add_argument('input_name', type=str, help='Input file')
parser.add_argument('output_name', type=str, nargs='?', help='Output file')
args = parser.parse_args()
forced_mode = args.forced_mode
classic_mode = args.classic_mode
backwards_mode = args.backwards_mode
quick_mode = args.quick_mode
skip = args.skip if args.skip else 0
output_name = args.output_name if args.output_name else args.input_name + ".zx0"
input_data = read_input_file(args.input_name, skip)
if not forced_mode and os.path.exists(output_name):
raise ApplicationError(f"Already existing output file {output_name}")
output_data, delta = compress_data(input_data, skip, backwards_mode, classic_mode, quick_mode)
write_output_file(output_name, output_data)
write_summary(backwards_mode, delta, input_data, output_data, skip)
return 0
if __name__ == "__main__":
main()

23
test_compress.py Normal file
View File

@ -0,0 +1,23 @@
import unittest
from compress import compress_data
class TestCompress(unittest.TestCase):
def test_compress_abcdef(self):
input_data = bytearray(b"abcdef")
output_data, delta = compress_data(input_data, skip=0,
backwards_mode=False, classic_mode=False, quick_mode=False)
expected_output_data = bytearray(b'MabcdefUV')
self.assertEqual(expected_output_data, output_data)
self.assertEqual(2, delta)
def test_compress_abcabc(self):
input_data = bytearray(b"abcabc")
output_data, delta = compress_data(input_data, skip=0,
backwards_mode=False, classic_mode=False, quick_mode=False)
expected_output_data = bytearray(b'{abc\xfaUU\x80')
self.assertEqual(expected_output_data, output_data)
self.assertEqual(3, delta)

24
test_optimize.py Normal file
View File

@ -0,0 +1,24 @@
import unittest
from optimize import elias_gamma_needed_bits
class TestEliasGammaBits(unittest.TestCase):
def test_elias_gamma_bits(self):
self.assertEqual(elias_gamma_needed_bits(1), 1)
self.assertEqual(elias_gamma_needed_bits(2), 3)
self.assertEqual(elias_gamma_needed_bits(3), 3)
self.assertEqual(elias_gamma_needed_bits(4), 5)
self.assertEqual(elias_gamma_needed_bits(5), 5)
self.assertEqual(elias_gamma_needed_bits(6), 5)
self.assertEqual(elias_gamma_needed_bits(7), 5)
self.assertEqual(elias_gamma_needed_bits(8), 7)
self.assertEqual(elias_gamma_needed_bits(9), 7)
self.assertEqual(elias_gamma_needed_bits(10), 7)
self.assertEqual(elias_gamma_needed_bits(11), 7)
self.assertEqual(elias_gamma_needed_bits(12), 7)
self.assertEqual(elias_gamma_needed_bits(13), 7)
self.assertEqual(elias_gamma_needed_bits(14), 7)
self.assertEqual(elias_gamma_needed_bits(15), 7)
self.assertEqual(elias_gamma_needed_bits(16), 9)
self.assertEqual(elias_gamma_needed_bits(17), 9)