|
|
|
@ -137,6 +137,21 @@ class CompressorDecompressorTestCase(unittest.TestCase): |
|
|
|
self.assertTrue(lzd.eof) |
|
|
|
self.assertEqual(lzd.unused_data, b"") |
|
|
|
|
|
|
|
def test_decompressor_chunks_empty(self): |
|
|
|
lzd = LZMADecompressor() |
|
|
|
out = [] |
|
|
|
for i in range(0, len(COMPRESSED_XZ), 10): |
|
|
|
self.assertFalse(lzd.eof) |
|
|
|
out.append(lzd.decompress(b'')) |
|
|
|
out.append(lzd.decompress(b'')) |
|
|
|
out.append(lzd.decompress(b'')) |
|
|
|
out.append(lzd.decompress(COMPRESSED_XZ[i:i+10])) |
|
|
|
out = b"".join(out) |
|
|
|
self.assertEqual(out, INPUT) |
|
|
|
self.assertEqual(lzd.check, lzma.CHECK_CRC64) |
|
|
|
self.assertTrue(lzd.eof) |
|
|
|
self.assertEqual(lzd.unused_data, b"") |
|
|
|
|
|
|
|
def test_decompressor_chunks_maxsize(self): |
|
|
|
lzd = LZMADecompressor() |
|
|
|
max_length = 100 |
|
|
|
@ -274,6 +289,16 @@ class CompressorDecompressorTestCase(unittest.TestCase): |
|
|
|
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) |
|
|
|
self._test_decompressor(lzd, cdata, lzma.CHECK_NONE) |
|
|
|
|
|
|
|
def test_roundtrip_raw_empty(self): |
|
|
|
lzc = LZMACompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) |
|
|
|
cdata = lzc.compress(INPUT) |
|
|
|
cdata += lzc.compress(b'') |
|
|
|
cdata += lzc.compress(b'') |
|
|
|
cdata += lzc.compress(b'') |
|
|
|
cdata += lzc.flush() |
|
|
|
lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) |
|
|
|
self._test_decompressor(lzd, cdata, lzma.CHECK_NONE) |
|
|
|
|
|
|
|
def test_roundtrip_chunks(self): |
|
|
|
lzc = LZMACompressor() |
|
|
|
cdata = [] |
|
|
|
@ -284,6 +309,19 @@ class CompressorDecompressorTestCase(unittest.TestCase): |
|
|
|
lzd = LZMADecompressor() |
|
|
|
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) |
|
|
|
|
|
|
|
def test_roundtrip_empty_chunks(self): |
|
|
|
lzc = LZMACompressor() |
|
|
|
cdata = [] |
|
|
|
for i in range(0, len(INPUT), 10): |
|
|
|
cdata.append(lzc.compress(INPUT[i:i+10])) |
|
|
|
cdata.append(lzc.compress(b'')) |
|
|
|
cdata.append(lzc.compress(b'')) |
|
|
|
cdata.append(lzc.compress(b'')) |
|
|
|
cdata.append(lzc.flush()) |
|
|
|
cdata = b"".join(cdata) |
|
|
|
lzd = LZMADecompressor() |
|
|
|
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) |
|
|
|
|
|
|
|
# LZMADecompressor intentionally does not handle concatenated streams. |
|
|
|
|
|
|
|
def test_decompressor_multistream(self): |
|
|
|
|