bitbake: bitbake: Add piping compression library

Adds a library that implements file-like objects (similar to
gzip.GzipFile) that can stream to arbitrary compression programs. This
is utilized to implement a LZ4 and zstd compression API.

(Bitbake rev: 61c3acd058ea018696bd284b3922d0b458838d05)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt 2021-07-14 10:01:57 -05:00 committed by Richard Purdie
parent 75fad23fc0
commit 8410987884
5 changed files with 338 additions and 0 deletions

View File

@ -29,6 +29,7 @@ tests = ["bb.tests.codeparser",
"bb.tests.runqueue",
"bb.tests.siggen",
"bb.tests.utils",
"bb.tests.compression",
"hashserv.tests",
"layerindexlib.tests.layerindexobj",
"layerindexlib.tests.restapi",

View File

@ -0,0 +1,194 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
# Helper library to implement streaming compression and decompression using an
# external process
#
# This library should be used directly by end users; a wrapper library for the
# specific compression tool should be created
import builtins
import io
import os
import subprocess
def open_wrap(
cls, filename, mode="rb", *, encoding=None, errors=None, newline=None, **kwargs
):
"""
Open a compressed file in binary or text mode.
Users should not call this directly. A specific compression library can use
this helper to provide it's own "open" command
The filename argument can be an actual filename (a str or bytes object), or
an existing file object to read from or write to.
The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for
binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is
"rb".
For binary mode, this function is equivalent to the cls constructor:
cls(filename, mode). In this case, the encoding, errors and newline
arguments must not be provided.
For text mode, a cls object is created, and wrapped in an
io.TextIOWrapper instance with the specified encoding, error handling
behavior, and line ending(s).
"""
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode")
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode")
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode")
file_mode = mode.replace("t", "")
if isinstance(filename, (str, bytes, os.PathLike)):
binary_file = cls(filename, file_mode, **kwargs)
elif hasattr(filename, "read") or hasattr(filename, "write"):
binary_file = cls(None, file_mode, fileobj=filename, **kwargs)
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "t" in mode:
return io.TextIOWrapper(
binary_file, encoding, errors, newline, write_through=True
)
else:
return binary_file
class CompressionError(OSError):
pass
class PipeFile(io.RawIOBase):
"""
Class that implements generically piping to/from a compression program
Derived classes should add the function get_compress() and get_decompress()
that return the required commands. Input will be piped into stdin and the
(de)compressed output should be written to stdout, e.g.:
class FooFile(PipeCompressionFile):
def get_decompress(self):
return ["fooc", "--decompress", "--stdout"]
def get_compress(self):
return ["fooc", "--compress", "--stdout"]
"""
READ = 0
WRITE = 1
def __init__(self, filename=None, mode="rb", *, stderr=None, fileobj=None):
if "t" in mode or "U" in mode:
raise ValueError("Invalid mode: {!r}".format(mode))
if not "b" in mode:
mode += "b"
if mode.startswith("r"):
self.mode = self.READ
elif mode.startswith("w"):
self.mode = self.WRITE
else:
raise ValueError("Invalid mode %r" % mode)
if fileobj is not None:
self.fileobj = fileobj
else:
self.fileobj = builtins.open(filename, mode or "rb")
if self.mode == self.READ:
self.p = subprocess.Popen(
self.get_decompress(),
stdin=self.fileobj,
stdout=subprocess.PIPE,
stderr=stderr,
close_fds=True,
)
self.pipe = self.p.stdout
else:
self.p = subprocess.Popen(
self.get_compress(),
stdin=subprocess.PIPE,
stdout=self.fileobj,
stderr=stderr,
close_fds=True,
)
self.pipe = self.p.stdin
self.__closed = False
def _check_process(self):
if self.p is None:
return
returncode = self.p.wait()
if returncode:
raise CompressionError("Process died with %d" % returncode)
self.p = None
def close(self):
if self.closed:
return
self.pipe.close()
if self.p is not None:
self._check_process()
self.fileobj.close()
self.__closed = True
@property
def closed(self):
return self.__closed
def fileno(self):
return self.pipe.fileno()
def flush(self):
self.pipe.flush()
def isatty(self):
return self.pipe.isatty()
def readable(self):
return self.mode == self.READ
def writable(self):
return self.mode == self.WRITE
def readinto(self, b):
if self.mode != self.READ:
import errno
raise OSError(
errno.EBADF, "read() on write-only %s object" % self.__class__.__name__
)
size = self.pipe.readinto(b)
if size == 0:
self._check_process()
return size
def write(self, data):
if self.mode != self.WRITE:
import errno
raise OSError(
errno.EBADF, "write() on read-only %s object" % self.__class__.__name__
)
data = self.pipe.write(data)
if not data:
self._check_process()
return data

View File

@ -0,0 +1,17 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
import bb.compress._pipecompress
def open(*args, **kwargs):
return bb.compress._pipecompress.open_wrap(LZ4File, *args, **kwargs)
class LZ4File(bb.compress._pipecompress.PipeFile):
def get_compress(self):
return ["lz4c", "-z", "-c"]
def get_decompress(self):
return ["lz4c", "-d", "-c"]

View File

@ -0,0 +1,28 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
import bb.compress._pipecompress
import shutil
def open(*args, **kwargs):
return bb.compress._pipecompress.open_wrap(ZstdFile, *args, **kwargs)
class ZstdFile(bb.compress._pipecompress.PipeFile):
def __init__(self, *args, num_threads=1, compresslevel=3, **kwargs):
self.num_threads = num_threads
self.compresslevel = compresslevel
super().__init__(*args, **kwargs)
def _get_zstd(self):
if self.num_threads == 1 or not shutil.which("pzstd"):
return ["zstd"]
return ["pzstd", "-p", "%d" % self.num_threads]
def get_compress(self):
return self._get_zstd() + ["-c", "-%d" % self.compresslevel]
def get_decompress(self):
return self._get_zstd() + ["-d", "-c"]

View File

@ -0,0 +1,98 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
from pathlib import Path
import bb.compress.lz4
import bb.compress.zstd
import contextlib
import os
import shutil
import tempfile
import unittest
import subprocess
class CompressionTests(object):
def setUp(self):
self._t = tempfile.TemporaryDirectory()
self.tmpdir = Path(self._t.name)
self.addCleanup(self._t.cleanup)
def _file_helper(self, mode_suffix, data):
tmp_file = self.tmpdir / "compressed"
with self.do_open(tmp_file, mode="w" + mode_suffix) as f:
f.write(data)
with self.do_open(tmp_file, mode="r" + mode_suffix) as f:
read_data = f.read()
self.assertEqual(read_data, data)
def test_text_file(self):
self._file_helper("t", "Hello")
def test_binary_file(self):
self._file_helper("b", "Hello".encode("utf-8"))
def _pipe_helper(self, mode_suffix, data):
rfd, wfd = os.pipe()
with open(rfd, "rb") as r, open(wfd, "wb") as w:
with self.do_open(r, mode="r" + mode_suffix) as decompress:
with self.do_open(w, mode="w" + mode_suffix) as compress:
compress.write(data)
read_data = decompress.read()
self.assertEqual(read_data, data)
def test_text_pipe(self):
self._pipe_helper("t", "Hello")
def test_binary_pipe(self):
self._pipe_helper("b", "Hello".encode("utf-8"))
def test_bad_decompress(self):
tmp_file = self.tmpdir / "compressed"
with tmp_file.open("wb") as f:
f.write(b"\x00")
with self.assertRaises(OSError):
with self.do_open(tmp_file, mode="rb", stderr=subprocess.DEVNULL) as f:
data = f.read()
class LZ4Tests(CompressionTests, unittest.TestCase):
def setUp(self):
if shutil.which("lz4c") is None:
self.skipTest("'lz4c' not found")
super().setUp()
@contextlib.contextmanager
def do_open(self, *args, **kwargs):
with bb.compress.lz4.open(*args, **kwargs) as f:
yield f
class ZStdTests(CompressionTests, unittest.TestCase):
def setUp(self):
if shutil.which("zstd") is None:
self.skipTest("'zstd' not found")
super().setUp()
@contextlib.contextmanager
def do_open(self, *args, **kwargs):
with bb.compress.zstd.open(*args, **kwargs) as f:
yield f
class PZStdTests(CompressionTests, unittest.TestCase):
def setUp(self):
if shutil.which("pzstd") is None:
self.skipTest("'pzstd' not found")
super().setUp()
@contextlib.contextmanager
def do_open(self, *args, **kwargs):
with bb.compress.zstd.open(*args, num_threads=2, **kwargs) as f:
yield f