diff --git a/CHANGELOG.md b/CHANGELOG.md index 660856c..a7b3a59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] ### Changed - **Dropped support of Python versions lower than Python 3.9.** +- The `WheelFile.writestr_*` methods will now preserve as `ZipInfo` attributes, + if a `ZipInfo` object has been passed instead of the filename. +- `WheelFile.from_wheelfile` constructor will now preserve `ZipInfo` + attributes of the files from distinfo and data directories of the original + archive. This includes file permissions. ## [0.0.8] - 2021-08-03 ### Changed diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py index 4372ff2..4a5a603 100644 --- a/tests/test_wheelfile.py +++ b/tests/test_wheelfile.py @@ -1077,3 +1077,70 @@ def test_writestr_distinfo_default_compresslevel_is_from_init(self, buf): compresslevel=9) wf.writestr_distinfo('file', b'data') assert wf.infolist()[0]._compresslevel == 9 + + +class TestZipinfoAttributePreserval: + + preserved_fields = pytest.mark.parametrize("field, value", [ + ("date_time", (2000, 1, 2, 3, 4, 2)), + ("compress_type", ZIP_BZIP2), + ("comment", b"Wubba lubba dub dub"), + ("extra", bytes([0x00, 0x00, 0x04, 0x00] + [0xFF]*4)), + ("create_system", 4), + ("create_version", 31), + ("extract_version", 42), + ("internal_attr", 0x02), + ("external_attr", 0x02), + + # Failing / impossible: + + # ZIP stores timestamps with two seconds of granularity + # ("date_time", (2000, 1, 2, 3, 4, 1)), + + # Not preservable without changing other values + # ("flag_bits", 0xFFFFFF), + + # Not supported by Python's zipfile + # ("volume", 0x01), + ]) + + @preserved_fields + def test_writestr_propagates_zipinfo_fields(self, field, value, wf, buf): + arcpath = "some/archive/path" + zi = ZipInfo(arcpath) + setattr(zi, field, value) + + wf.writestr(zi, "_") + wf.close() + + with WheelFile(buf, distname="_", version='0') as wf: + assert getattr(wf.zipfile.getinfo(arcpath), field) == value + + @preserved_fields + def test_writestr_data_propagates_zipinfo_fields(self, field, value, wf, buf): + data_path = "some/data" + section = "section" + zi = ZipInfo(data_path) + setattr(zi, field, value) + + wf.writestr_data(section, zi, "_") + wf.close() + + arcpath = wf.data_dirname + "/" + section + "/" + data_path + + with WheelFile(buf, distname="_", version='0') as wf: + assert getattr(wf.zipfile.getinfo(arcpath), field) == value + + @preserved_fields + def test_writestr_distinfo_propagates_zipinfo_fields(self, field, value, wf, buf): + data_path = "some/metadata" + zi = ZipInfo(data_path) + setattr(zi, field, value) + + wf.writestr_distinfo(zi, "_") + wf.close() + + arcpath = wf.distinfo_dirname + "/" + data_path + + with WheelFile(buf, distname="_", version='0') as wf: + assert getattr(wf.zipfile.getinfo(arcpath), field) == value diff --git a/tests/test_wheelfile_cloning.py b/tests/test_wheelfile_cloning.py index d474ea6..41b5d3b 100644 --- a/tests/test_wheelfile_cloning.py +++ b/tests/test_wheelfile_cloning.py @@ -1,6 +1,6 @@ import io import os -from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_STORED, ZipInfo +from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA, ZIP_STORED, ZipInfo import pytest @@ -211,9 +211,37 @@ def test_data_is_copied(self, wf, buf): for arcname, data in archive.items(): assert cwf.zipfile.read(arcname) == data - PRESERVED_ZIPINFO_ATTRS = ['date_time', 'compress_type', 'comment', + def test_substitutes_compress_type_if_passed(self, wf, buf): + wf.writestr("file1", "", compress_type=ZIP_BZIP2) + new_compression = ZIP_LZMA + + with WheelFile.from_wheelfile(wf, buf, compression=new_compression) as cwf: + assert cwf.zipfile.infolist()[0].compress_type == new_compression + + def test_preserves_compress_type_if_not_passed(self, wf, buf): + old_compression = ZIP_BZIP2 + wf.writestr("file1", "", compress_type=old_compression) + + with WheelFile.from_wheelfile(wf, buf) as cwf: + assert cwf.zipfile.infolist()[0].compress_type == old_compression + + def test_substitutes_compresslevel_if_passed(self, wf, buf): + wf.writestr("file1", "", compress_type=ZIP_BZIP2, compresslevel=5) + new_compresslevel = 7 + + with WheelFile.from_wheelfile(wf, buf, compression=ZIP_LZMA, compresslevel=new_compresslevel) as cwf: + assert cwf.zipfile.infolist()[0]._compresslevel == new_compresslevel + + def test_preserves_compresslevel_if_not_passed(self, wf, buf): + old_compresslevel = 7 + wf.writestr("file1", "", compress_type=ZIP_BZIP2, compresslevel=old_compresslevel) + + with WheelFile.from_wheelfile(wf, buf) as cwf: + assert cwf.zipfile.infolist()[0]._compresslevel == old_compresslevel + + PRESERVED_ZIPINFO_ATTRS = ['date_time', 'compress_type', '_compresslevel', 'comment', 'extra', 'create_system', 'create_version', - 'extract_version', 'flag_bits', 'volume', + 'extract_version', 'volume', 'internal_attr', 'external_attr'] def custom_zipinfo(self): @@ -222,9 +250,8 @@ def custom_zipinfo(self): zf.comment = b"comment" zf.extra = b"extra" zf.create_system = 2 - zf.create_version = 21 - zf.extract_version = 19 - zf.flag_bits = 0o123 + zf.create_version = 50 + zf.extract_version = 60 zf.volume = 7 zf.internal_attr = 123 zf.external_attr = 321 @@ -240,7 +267,6 @@ def test_zip_attributes_are_preserved_writestr(self, wf, buf, attr): assert getattr(czf, attr) == getattr(zf, attr) - @pytest.mark.xfail(reason="writestr_data does not propagate zinfo yet") @pytest.mark.parametrize("attr", PRESERVED_ZIPINFO_ATTRS) def test_zip_attributes_are_preserved_writestr_data(self, wf, buf, attr): zf = self.custom_zipinfo() @@ -251,9 +277,6 @@ def test_zip_attributes_are_preserved_writestr_data(self, wf, buf, attr): assert getattr(czf, attr) == getattr(zf, attr) - # writestr_data does not propagate zinfo yet - # skipped because it generates lots of warnings - @pytest.mark.xfail(reason="writestr_distinfo does not propagate zinfo yet") @pytest.mark.parametrize("attr", PRESERVED_ZIPINFO_ATTRS) def test_zip_attributes_are_preserved_writestr_distinfo(self, wf, buf, attr): diff --git a/wheelfile.py b/wheelfile.py index be00cd3..65a0102 100644 --- a/wheelfile.py +++ b/wheelfile.py @@ -96,6 +96,42 @@ def _slots_from_params(func): return slots +def _clone_zipinfo(zinfo: zipfile.ZipInfo, **to_replace) -> zipfile.ZipInfo: + """Clone a ZipInfo object and update its attributes using to_replace.""" + + PRESERVED_ZIPINFO_ATTRS = [ + "date_time", + "compress_type", + "_compresslevel", + "comment", + "extra", + "create_system", + "create_version", + "extract_version", + "volume", + "internal_attr", + "external_attr", + ] + + # `orig_filename` instead of `filename` is used to prevent any possibility + # of confusing ZipInfo filename normalization. + new_name = zinfo.orig_filename + if "filename" in to_replace: + new_name = to_replace["filename"] + del to_replace["filename"] + + new_zinfo = zipfile.ZipInfo(filename=new_name) + for attr in PRESERVED_ZIPINFO_ATTRS: + replaced = to_replace.get(attr) + + if replaced is not None: + setattr(new_zinfo, attr, replaced) + else: + setattr(new_zinfo, attr, getattr(zinfo, attr)) + + return new_zinfo + + # TODO: accept packaging.requirements.Requirement in requires_dist, fix this in # example, ensure such objects are converted on __str__ # TODO: reimplement using dataclasses @@ -1228,7 +1264,7 @@ def from_wheelfile( language_tag: Union[str, None, _Sentinel] = _unspecified, abi_tag: Union[str, None, _Sentinel] = _unspecified, platform_tag: Union[str, None, _Sentinel] = _unspecified, - compression: int = zipfile.ZIP_DEFLATED, + compression: Optional[int] = None, allowZip64: bool = True, compresslevel: Optional[int] = None, strict_timestamps: bool = True, @@ -1299,7 +1335,13 @@ def from_wheelfile( turn passes them to `zipfile.ZipFile` - see `zipfile` docs for full description on each. - Value from `wf` is *not* reused for this parameter. + Value used to construct `wf` is *not* reused for these parameters. + + For `compression` and `compresslevel`, if the value is not passed, + the values from the original archive are preserved. Internally the + data is copied using `ZipFile.writestr` with `ZipInfo` attributes + of the files preserved. If the value is passed, these normally + preserved attributes are substituted. Raises ------ @@ -1380,6 +1422,11 @@ def from_wheelfile( "both objects' paths point at the same file." ) + if compression is None: + default_compression = zipfile.ZIP_DEFLATED + else: + default_compression = compression + new_wf = WheelFile( file_or_path, mode, distname=distname, @@ -1388,7 +1435,7 @@ def from_wheelfile( language_tag=language_tag, abi_tag=abi_tag, platform_tag=platform_tag, - compression=compression, + compression=default_compression, allowZip64=allowZip64, compresslevel=compresslevel, strict_timestamps=strict_timestamps, @@ -1429,19 +1476,24 @@ def from_wheelfile( to_copy = wf.infolist() for zinfo in to_copy: + data = wf.zipfile.read(zinfo) + arcname = zinfo.filename arcname_head, *arcname_tail_parts = arcname.split('/') arcname_tail = '/'.join(arcname_tail_parts) if arcname_head == wf.distinfo_dirname: new_arcname = new_wf.distinfo_dirname + '/' + arcname_tail - new_wf.writestr(new_arcname, wf.zipfile.read(zinfo)) - continue - if arcname_head == wf.data_dirname: + zinfo = _clone_zipinfo(zinfo, filename=new_arcname) + elif arcname_head == wf.data_dirname: new_arcname = new_wf.data_dirname + '/' + arcname_tail - new_wf.writestr(new_arcname, wf.zipfile.read(zinfo)) - continue + zinfo = _clone_zipinfo(zinfo, filename=new_arcname) - new_wf.writestr(zinfo, wf.zipfile.read(zinfo)) + new_wf.writestr( + zinfo, + data, + compress_type=compression, + compresslevel=compresslevel, + ) return new_wf @@ -1966,7 +2018,6 @@ def _os_walk_path_to_arcpath(prefix: str, directory: str, path = os.path.join(arcname, directory[len(prefix):], stem) return path - # TODO: Make sure fields of given ZipInfo objects are propagated def writestr(self, zinfo_or_arcname: Union[zipfile.ZipInfo, str], data: Union[bytes, str], @@ -2090,7 +2141,6 @@ def write_data(self, filename: Union[str, Path], # TODO: drive letter should be stripped from the arcname the same way # ZipInfo.from_file does it - # TODO: Make sure fields of given ZipInfo objects are propagated def writestr_data(self, section: str, zinfo_or_arcname: Union[zipfile.ZipInfo, str], data: Union[bytes, str], @@ -2140,10 +2190,14 @@ def writestr_data(self, section: str, else zinfo_or_arcname ) - arcname = self._distinfo_path(section + '/' + arcname.lstrip('/'), - kind='data') + data_arcname = self._distinfo_path(section + '/' + arcname.lstrip('/'), kind='data') - self.writestr(arcname, data, compress_type, compresslevel) + if isinstance(zinfo_or_arcname, zipfile.ZipInfo): + zinfo_or_arcname = _clone_zipinfo(zinfo_or_arcname, filename=data_arcname) + else: + zinfo_or_arcname = data_arcname + + self.writestr(zinfo_or_arcname, data, compress_type, compresslevel) # TODO: Lazy mode should permit writing meta here def write_distinfo(self, filename: Union[str, Path], @@ -2231,7 +2285,6 @@ def write_distinfo(self, filename: Union[str, Path], self.write(filename, arcname, compress_type, compresslevel, recursive=recursive, skipdir=skipdir) - # TODO: Make sure fields of given ZipInfo objects are propagated def writestr_distinfo(self, zinfo_or_arcname: Union[zipfile.ZipInfo, str], data: Union[bytes, str], compress_type: Optional[int] = None, @@ -2288,8 +2341,14 @@ def writestr_distinfo(self, zinfo_or_arcname: Union[zipfile.ZipInfo, str], f"Write would result in a duplicated metadata file: {arcname}." ) - arcname = self._distinfo_path(arcname.lstrip('/')) - self.writestr(arcname, data, compress_type, compresslevel) + dist_arcname = self._distinfo_path(arcname.lstrip('/')) + + if isinstance(zinfo_or_arcname, zipfile.ZipInfo): + zinfo_or_arcname = _clone_zipinfo(zinfo_or_arcname, filename=dist_arcname) + else: + zinfo_or_arcname = dist_arcname + + self.writestr(zinfo_or_arcname, data, compress_type, compresslevel) @staticmethod def _check_section(section):