From 5c2c2d1b364bfd0812e6ccf2ac000148d65c1ae9 Mon Sep 17 00:00:00 2001 From: Zedeldi <66186954+Zedeldi@users.noreply.github.com> Date: Tue, 3 Dec 2024 19:57:33 +0000 Subject: [PATCH] Use field metadata instead of class variable dictionaries --- README.md | 6 +- igelfs/models/base.py | 62 ++++++++++++++---- igelfs/models/boot_registry.py | 94 ++++++++++++--------------- igelfs/models/bootsplash.py | 26 +++----- igelfs/models/directory.py | 96 +++++++++++++-------------- igelfs/models/hash.py | 114 ++++++++++++++------------------- igelfs/models/mixins.py | 15 ++++- igelfs/models/partition.py | 113 ++++++++++++++++---------------- igelfs/models/section.py | 62 +++++++++--------- 9 files changed, 299 insertions(+), 289 deletions(-) diff --git a/README.md b/README.md index 7055495..ebbfdde 100644 --- a/README.md +++ b/README.md @@ -70,10 +70,10 @@ Most of the higher-level models are taken directly from `igelsdk.h`, with added `BaseBytesModel` provides an abstract base class, with concrete methods for handling bytes, shared across various models. `BaseDataModel` is the parent class for all higher-level models. -For these models to be instantiated directly from bytes, they must define `MODEL_ATTRIBUTE_SIZES` -as a mapping of dataclass field names to the length of bytes to read. +For these models to be instantiated directly from bytes, they must define fields +with metadata containing the size of bytes to read. To set default values when instantiating models from nothing with `new`, -add the value to the `DEFAULT_VALUES` dictionary of the model. +add the `default` value to the metadata of the field. #### Section diff --git a/igelfs/models/base.py b/igelfs/models/base.py index 815cbb7..16a57e0 100644 --- a/igelfs/models/base.py +++ b/igelfs/models/base.py @@ -1,8 +1,9 @@ """Concrete base classes for various data models.""" import io +from collections.abc import Mapping from dataclasses import Field, dataclass -from typing import Any, ClassVar, get_args, get_origin +from typing import Any, Iterator, get_args, get_origin from igelfs.models.abc import BaseBytesModel from igelfs.models.collections import DataModelCollection @@ -10,13 +11,35 @@ from igelfs.utils import replace_bytes +@dataclass +class DataModelMetadata(Mapping, DataclassMixin): + """ + Dataclass to provide metadata for data models. + + The metadata for fields must be a mapping. This dataclass is used + to provide a specification for attribute names instead of a dictionary. + """ + + size: int + default: Any = None + + def __getitem__(self, key: str) -> Any: + """Implement get item method for metadata.""" + return self.to_dict(shallow=True)[key] + + def __iter__(self) -> Iterator[str]: + """Implement iterating through metadata.""" + yield from self.to_dict(shallow=True) + + def __len__(self) -> int: + """Implement getting length of metadata.""" + return len(self.to_dict(shallow=True)) + + @dataclass class BaseDataModel(BaseBytesModel, DataclassMixin): """Concrete base class for data model.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] - DEFAULT_VALUES: ClassVar[dict[str, Any]] - def __len__(self) -> int: """Implement __len__ data model method.""" return self.get_actual_size() @@ -37,24 +60,40 @@ def to_bytes(self) -> bytes: fd.seek(0) return fd.read() + @classmethod + def _get_attribute_metadata( + cls: type["BaseDataModel"], + ) -> dict[str, Mapping[str, Any]]: + """Return dictionary of attribute metadata.""" + return {field.name: field.metadata for field in cls.get_fields(init_only=True)} + + @classmethod + def _get_attribute_metadata_by_name( + cls: type["BaseDataModel"], name: str + ) -> Mapping[str, Any]: + """Return metadata for specified attribute.""" + return cls._get_attribute_metadata()[name] + @classmethod def get_model_size(cls: type["BaseDataModel"]) -> int: """Return expected total size of data for model.""" - return sum(cls.MODEL_ATTRIBUTE_SIZES.values()) + return sum( + metadata["size"] for metadata in cls._get_attribute_metadata().values() + ) @classmethod def get_attribute_size(cls: type["BaseDataModel"], name: str) -> int: """Return size of data for attribute.""" - return cls.MODEL_ATTRIBUTE_SIZES[name] + return cls._get_attribute_metadata_by_name(name)["size"] @classmethod def get_attribute_offset(cls: type["BaseDataModel"], name: str) -> int: """Return offset of bytes for attribute.""" offset = 0 - for attribute, size in cls.MODEL_ATTRIBUTE_SIZES.items(): + for attribute, metadata in cls._get_attribute_metadata().items(): if attribute == name: return offset - offset += size + offset += metadata["size"] else: raise KeyError(f"Attribute '{name}' not found") @@ -133,9 +172,10 @@ def from_bytes_with_remaining( def _get_default_bytes(cls: type["BaseDataModel"]) -> bytes: """Return default bytes for new data model.""" data = bytes(cls.get_model_size()) - if not hasattr(cls, "DEFAULT_VALUES"): - return data - for name, value in cls.DEFAULT_VALUES.items(): + for name, metadata in cls._get_attribute_metadata().items(): + value = metadata.get("default") + if not value: + continue if callable(value): value = value() offset = cls.get_attribute_offset(name) diff --git a/igelfs/models/boot_registry.py b/igelfs/models/boot_registry.py index c6d5920..3b6c71f 100644 --- a/igelfs/models/boot_registry.py +++ b/igelfs/models/boot_registry.py @@ -1,13 +1,12 @@ """Data models for the boot registry of a filesystem image.""" from abc import abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from functools import partial -from typing import ClassVar from igelfs.constants import BOOTREG_IDENT, BOOTREG_MAGIC, IGEL_BOOTREG_SIZE -from igelfs.models.base import BaseDataModel +from igelfs.models.base import BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection @@ -21,10 +20,10 @@ def generate_boot_id() -> str: class BootRegistryEntry(BaseDataModel): """Dataclass to describe each entry of boot registry.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = {"flag": 2, "data": 62} - - flag: int # first 9 bits next, 1 bit next present, 6 bit len key - data: bytes + flag: int = field( # first 9 bits next, 1 bit next present, 6 bit len key + metadata=DataModelMetadata(size=2) + ) + data: bytes = field(metadata=DataModelMetadata(size=62)) @property def _flag_bits(self) -> tuple[str, str, str]: @@ -38,7 +37,7 @@ def _flag_bits(self) -> tuple[str, str, str]: bits = ( bin( int( - self.flag.to_bytes(self.MODEL_ATTRIBUTE_SIZES["flag"], "big").hex(), + self.flag.to_bytes(self.get_attribute_size("flag"), "big").hex(), base=16, ) ) @@ -87,8 +86,6 @@ def value(self) -> str: class BaseBootRegistryHeader(BaseDataModel): """Base class for boot registry header.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = {"_": IGEL_BOOTREG_SIZE} - def __post_init__(self) -> None: """Verify identity string on initialisation.""" if self.ident_legacy != BOOTREG_IDENT: @@ -110,39 +107,34 @@ class BootRegistryHeader(BaseBootRegistryHeader): The boot registry resides in section #0 of the image. """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "ident_legacy": 17, - "magic": 4, - "hdr_version": 1, - "boot_id": 21, - "enc_alg": 1, - "flags": 2, - "empty": 82, - "free": 64, - "used": 64, - "dir": 252, - "reserve": 4, - "entry": 504 * BootRegistryEntry.get_model_size(), - } - DEFAULT_VALUES = { - "ident_legacy": BOOTREG_IDENT, - "magic": BOOTREG_MAGIC, - "hdr_version": 1, - "boot_id": generate_boot_id, - } - - ident_legacy: str # "IGEL BOOTREGISTRY" - magic: str # BOOTREG_MAGIC - hdr_version: int # 0x01 for the first - boot_id: str # boot_id - enc_alg: int # encryption algorithm - flags: int # flags - empty: bytes # placeholder - free: bytes # bitmap with free 64 byte blocks - used: bytes # bitmap with used 64 byte blocks - dir: bytes # directory bitmap (4 bits for each block -> key len) - reserve: bytes # placeholder - entry: DataModelCollection[BootRegistryEntry] # real data + ident_legacy: str = field( # "IGEL BOOTREGISTRY" + metadata=DataModelMetadata(size=17, default=BOOTREG_IDENT) + ) + magic: str = field( # BOOTREG_MAGIC + metadata=DataModelMetadata(size=4, default=BOOTREG_MAGIC) + ) + hdr_version: int = field( # 0x01 for the first + metadata=DataModelMetadata(size=1, default=1) + ) + boot_id: str = field( # boot_id + metadata=DataModelMetadata(size=21, default=generate_boot_id) + ) + enc_alg: int = field(metadata=DataModelMetadata(size=1)) # encryption algorithm + flags: int = field(metadata=DataModelMetadata(size=2)) # flags + empty: bytes = field(metadata=DataModelMetadata(size=82)) # placeholder + free: bytes = field( # bitmap with free 64 byte blocks + metadata=DataModelMetadata(size=64) + ) + used: bytes = field( # bitmap with used 64 byte blocks + metadata=DataModelMetadata(size=64) + ) + dir: bytes = field( # directory bitmap (4 bits for each block -> key len) + metadata=DataModelMetadata(size=252) + ) + reserve: bytes = field(metadata=DataModelMetadata(size=4)) # placeholder + entry: DataModelCollection[BootRegistryEntry] = field( # real data + metadata=DataModelMetadata(size=504 * BootRegistryEntry.get_model_size()) + ) def __post_init__(self) -> None: """Verify magic string on initialisation.""" @@ -178,14 +170,10 @@ class BootRegistryHeaderLegacy(BaseBootRegistryHeader): The boot registry resides in section #0 of the image. """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "ident_legacy": 17, - "entry": IGEL_BOOTREG_SIZE - 17, - } - DEFAULT_VALUES = {"ident_legacy": BOOTREG_IDENT} - - ident_legacy: str - entry: bytes + ident_legacy: str = field( + metadata=DataModelMetadata(size=17, default=BOOTREG_IDENT) + ) + entry: bytes = field(metadata=DataModelMetadata(size=IGEL_BOOTREG_SIZE - 17)) def get_entries(self) -> dict[str, str]: """Return dictionary of all boot registry entries.""" @@ -209,8 +197,8 @@ class BootRegistryHeaderFactory: @staticmethod def is_legacy_boot_registry(data: bytes) -> bool: """Return whether bytes represent a legacy boot registry header.""" - ident_legacy = BootRegistryHeader.MODEL_ATTRIBUTE_SIZES["ident_legacy"] - magic = BootRegistryHeader.MODEL_ATTRIBUTE_SIZES["magic"] + ident_legacy = BootRegistryHeader.get_attribute_size("ident_legacy") + magic = BootRegistryHeader.get_attribute_size("magic") if data[ident_legacy : ident_legacy + magic].decode() == BOOTREG_MAGIC: return False return True diff --git a/igelfs/models/bootsplash.py b/igelfs/models/bootsplash.py index ce66bc6..8d7cf41 100644 --- a/igelfs/models/bootsplash.py +++ b/igelfs/models/bootsplash.py @@ -1,13 +1,12 @@ """Data models for bootsplash structures.""" import io -from dataclasses import dataclass -from typing import ClassVar +from dataclasses import dataclass, field from PIL import Image from igelfs.constants import BOOTSPLASH_MAGIC -from igelfs.models.base import BaseDataGroup, BaseDataModel +from igelfs.models.base import BaseDataGroup, BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection @@ -15,11 +14,10 @@ class BootsplashHeader(BaseDataModel): """Dataclass to handle bootsplash header data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = {"magic": 14, "num_splashs": 1} - DEFAULT_VALUES = {"magic": BOOTSPLASH_MAGIC} - - magic: str # BOOTSPLASH_MAGIC - num_splashs: int + magic: str = field( # BOOTSPLASH_MAGIC + metadata=DataModelMetadata(size=14, default=BOOTSPLASH_MAGIC) + ) + num_splashs: int = field(metadata=DataModelMetadata(size=1)) def __post_init__(self) -> None: """Verify magic string on initialisation.""" @@ -31,15 +29,9 @@ def __post_init__(self) -> None: class Bootsplash(BaseDataModel): """Dataclass to handle bootsplash data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "offset": 8, - "length": 8, - "ident": 8, - } - - offset: int - length: int - ident: bytes + offset: int = field(metadata=DataModelMetadata(size=8)) + length: int = field(metadata=DataModelMetadata(size=8)) + ident: bytes = field(metadata=DataModelMetadata(size=8)) @dataclass diff --git a/igelfs/models/directory.py b/igelfs/models/directory.py index e388c5b..d7c02bf 100644 --- a/igelfs/models/directory.py +++ b/igelfs/models/directory.py @@ -1,7 +1,6 @@ """Data models for IGEL filesystem directory.""" -from dataclasses import dataclass -from typing import ClassVar +from dataclasses import dataclass, field from igelfs.constants import ( DIR_MAX_MINORS, @@ -9,7 +8,7 @@ MAX_FRAGMENTS, PartitionType, ) -from igelfs.models.base import BaseDataModel +from igelfs.models.base import BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection from igelfs.models.mixins import CRCMixin @@ -18,27 +17,26 @@ class FragmentDescriptor(BaseDataModel): """Dataclass to handle fragment descriptors.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = {"first_section": 4, "length": 4} - - first_section: int - length: int # number of sections + first_section: int = field(metadata=DataModelMetadata(size=4)) + length: int = field(metadata=DataModelMetadata(size=4)) # number of sections @dataclass class PartitionDescriptor(BaseDataModel): """Dataclass to handle partition descriptors.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "minor": 4, - "type": 2, - "first_fragment": 2, - "n_fragments": 2, - } - - minor: int # a replication of igf_sect_hdr.partition_minor - type: int # partition type, a replication of igf_part_hdr.type - first_fragment: int # index of the first fragment - n_fragments: int # number of additional fragments + minor: int = field( # a replication of igf_sect_hdr.partition_minor + metadata=DataModelMetadata(size=4) + ) + type: int = field( # partition type, a replication of igf_part_hdr.type + metadata=DataModelMetadata(size=2) + ) + first_fragment: int = field( # index of the first fragment + metadata=DataModelMetadata(size=2) + ) + n_fragments: int = field( # number of additional fragments + metadata=DataModelMetadata(size=2) + ) def get_type(self) -> PartitionType: """Return PartitionType from PartitionDescriptor instance.""" @@ -53,38 +51,40 @@ class Directory(BaseDataModel, CRCMixin): The directory resides in section #0 of the image. """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "magic": 4, - "crc": 4, - "dir_type": 2, - "max_minors": 2, - "version": 2, - "dummy": 2, - "n_fragments": 4, - "max_fragments": 4, - "extension": 8, - "partition": DIR_MAX_MINORS * PartitionDescriptor.get_model_size(), - "fragment": MAX_FRAGMENTS * FragmentDescriptor.get_model_size(), - } - DEFAULT_VALUES = { - "magic": DIRECTORY_MAGIC, - "version": 1, - "max_minors": DIR_MAX_MINORS, - "max_fragments": MAX_FRAGMENTS, - } CRC_OFFSET = 4 + 4 - magic: str # DIRECTORY_MAGIC - crc: int - dir_type: int # allows for future extensions - max_minors: int # redundant, allows for dynamic part table - version: int # update count, never used so far - dummy: int # for future extensions - n_fragments: int # total number of fragments - max_fragments: int # redundant, allows for dynamic frag table - extension: bytes # unspecified, for future extensions - partition: DataModelCollection[PartitionDescriptor] - fragment: DataModelCollection[FragmentDescriptor] + # DIRECTORY_MAGIC + magic: str = field(metadata=DataModelMetadata(size=4, default=DIRECTORY_MAGIC)) + crc: int = field(metadata=DataModelMetadata(size=4)) + dir_type: int = field( # allows for future extensions + metadata=DataModelMetadata(size=2) + ) + max_minors: int = field( # redundant, allows for dynamic part table + metadata=DataModelMetadata(size=2, default=DIR_MAX_MINORS) + ) + version: int = field( # update count, never used so far + metadata=DataModelMetadata(size=2, default=1) + ) + dummy: int = field(metadata=DataModelMetadata(size=2)) # for future extensions + n_fragments: int = field( # total number of fragments + metadata=DataModelMetadata(size=4) + ) + max_fragments: int = field( # redundant, allows for dynamic frag table + metadata=DataModelMetadata(size=4, default=MAX_FRAGMENTS) + ) + extension: bytes = field( # unspecified, for future extensions + metadata=DataModelMetadata(size=8) + ) + partition: DataModelCollection[PartitionDescriptor] = field( + metadata=DataModelMetadata( + size=DIR_MAX_MINORS * PartitionDescriptor.get_model_size() + ) + ) + fragment: DataModelCollection[FragmentDescriptor] = field( + metadata=DataModelMetadata( + size=MAX_FRAGMENTS * FragmentDescriptor.get_model_size() + ) + ) def __post_init__(self) -> None: """Verify magic string on initialisation.""" diff --git a/igelfs/models/hash.py b/igelfs/models/hash.py index c9a54c7..cffc0ac 100644 --- a/igelfs/models/hash.py +++ b/igelfs/models/hash.py @@ -1,14 +1,13 @@ """Data models for hash data of a partition.""" import hashlib -from dataclasses import dataclass -from typing import ClassVar +from dataclasses import dataclass, field import rsa from igelfs.constants import HASH_HDR_IDENT from igelfs.keys import IGEL_PUBLIC_KEYS -from igelfs.models.base import BaseDataGroup, BaseDataModel +from igelfs.models.base import BaseDataGroup, BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection @@ -16,21 +15,12 @@ class HashInformation(BaseDataModel): """Dataclass to handle hash information data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "offset_cache": 4, - "offset_hashes": 4, - "count_blocks": 4, - "block_size": 4, - "count_excludes": 2, - "hash_size": 2, - } - - offset_cache: int - offset_hashes: int - count_blocks: int - block_size: int - count_excludes: int - hash_size: int + offset_cache: int = field(metadata=DataModelMetadata(size=4)) + offset_hashes: int = field(metadata=DataModelMetadata(size=4)) + count_blocks: int = field(metadata=DataModelMetadata(size=4)) + block_size: int = field(metadata=DataModelMetadata(size=4)) + count_excludes: int = field(metadata=DataModelMetadata(size=2)) + hash_size: int = field(metadata=DataModelMetadata(size=2)) @dataclass @@ -52,17 +42,13 @@ class HashExclude(BaseDataModel): - 836-836 + (HashHeader.hash_bytes * HashHeader.count_hash) => Section.hash_value """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "start": 8, - "size": 4, - "repeat": 4, - "end": 8, - } - - start: int # start of area to exclude - size: int # size of area to exclude - repeat: int # repeat after ... bytes if 0 -> no repeat - end: int # end address where the exclude area end (only used if repeat is defined) + start: int = field(metadata=DataModelMetadata(size=8)) # start of area to exclude + size: int = field(metadata=DataModelMetadata(size=4)) # size of area to exclude + repeat: int = field( # repeat after ... bytes if 0 -> no repeat + metadata=DataModelMetadata(size=4) + ) + # end address where the exclude area end (only used if repeat is defined) + end: int = field(metadata=DataModelMetadata(size=8)) def get_excluded_indices(self) -> list[int]: """Return list of excluded indices for hash.""" @@ -89,46 +75,44 @@ def get_excluded_indices_from_collection( class HashHeader(BaseDataModel): """Dataclass to handle hash header data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "ident": 6, - "version": 2, - "signature": 512, - "count_hash": 8, - "signature_algo": 1, - "hash_algo": 1, - "hash_bytes": 2, - "blocksize": 4, - "hash_header_size": 4, - "hash_block_size": 4, - "count_excludes": 2, - "excludes_size": 2, - "offset_hash": 4, - "offset_hash_excludes": 4, - "reserved": 4, - } - DEFAULT_VALUES = {"ident": HASH_HDR_IDENT} - - ident: str # Ident string "chksum" + ident: str = field( # Ident string "chksum" + metadata=DataModelMetadata(size=6, default=HASH_HDR_IDENT) + ) # version number of header probably use with flags # something like version = version & 0xff; if (version |= FLAG ...) - version: int - signature: bytes # 512 Bytes -> 4096bit signature length - count_hash: int # count of hash values - signature_algo: ( - int # Used signature algo (which is a define like HASH_SIGNATURE_TYPE_NONE) + version: int = field(metadata=DataModelMetadata(size=2)) + signature: bytes = field( # 512 Bytes -> 4096bit signature length + metadata=DataModelMetadata(size=512) + ) + count_hash: int = field(metadata=DataModelMetadata(size=8)) # count of hash values + # Used signature algo (which is a define like HASH_SIGNATURE_TYPE_NONE) + signature_algo: int = field(metadata=DataModelMetadata(size=1)) + # Used hash algo (which is a define like HASH_ALGO_TYPE_NONE) + hash_algo: int = field(metadata=DataModelMetadata(size=1)) + # bytes used for hash sha256 -> 32bytes, sha512 -> 64bytes + hash_bytes: int = field(metadata=DataModelMetadata(size=2)) + blocksize: int = field( # size of data used for hashing + metadata=DataModelMetadata(size=4) + ) + hash_header_size: int = field( # size of the hash_header (with hash excludes) + metadata=DataModelMetadata(size=4) + ) + hash_block_size: int = field( # size of the hash values block + metadata=DataModelMetadata(size=4) + ) + count_excludes: int = field( # count of struct igel_hash_exclude variables + metadata=DataModelMetadata(size=2) + ) + excludes_size: int = field( # size of struct igel_hash_exclude variables in Bytes + metadata=DataModelMetadata(size=2) ) - hash_algo: int # Used hash algo (which is a define like HASH_ALGO_TYPE_NONE) - hash_bytes: int # bytes used for hash sha256 -> 32bytes, sha512 -> 64bytes - blocksize: int # size of data used for hashing - hash_header_size: int # size of the hash_header (with hash excludes) - hash_block_size: int # size of the hash values block - count_excludes: int # count of struct igel_hash_exclude variables - excludes_size: int # size of struct igel_hash_exclude variables in Bytes - offset_hash: int # offset of hash block from section header in bytes - offset_hash_excludes: ( - int # offset of hash_excludes block from start of igel_hash_header in bytes + offset_hash: int = field( # offset of hash block from section header in bytes + metadata=DataModelMetadata(size=4) ) - reserved: bytes # reserved for further use/padding for excludes alignment + # offset of hash_excludes block from start of igel_hash_header in bytes + offset_hash_excludes: int = field(metadata=DataModelMetadata(size=4)) + # reserved for further use/padding for excludes alignment + reserved: bytes = field(metadata=DataModelMetadata(size=4)) def __post_init__(self) -> None: """Verify ident string on initialisation.""" diff --git a/igelfs/models/mixins.py b/igelfs/models/mixins.py index c853c4d..a57c8c7 100644 --- a/igelfs/models/mixins.py +++ b/igelfs/models/mixins.py @@ -34,8 +34,17 @@ def update_crc(self) -> None: class DataclassMixin: """Provide methods to obtain various data from a dataclass.""" - def to_dict(self) -> dict[str, Any]: - """Return dictionary for data model.""" + def to_dict(self, shallow: bool = False) -> dict[str, Any]: + """ + Return dictionary for data model. + + If shallow is True, do not recurse into nested data structures. + """ + if shallow: + return { + field.name: getattr(self, field.name) + for field in self.get_fields(init_only=False) + } return asdict(self) @classmethod @@ -52,7 +61,7 @@ def get_fields(cls: type[Dataclass], init_only: bool = True) -> Iterator[Field]: @classmethod def get_field_by_name( - cls: type["FieldsMixin"], name: str, *args, **kwargs + cls: type["DataclassMixin"], name: str, *args, **kwargs ) -> Field: """Return field for dataclass by name.""" for field in cls.get_fields(*args, **kwargs): diff --git a/igelfs/models/partition.py b/igelfs/models/partition.py index 7a89938..0e72649 100644 --- a/igelfs/models/partition.py +++ b/igelfs/models/partition.py @@ -1,10 +1,9 @@ """Data models for a partition.""" from dataclasses import dataclass, field -from typing import ClassVar from igelfs.constants import MAX_EXTENT_NUM, ExtentType, PartitionType -from igelfs.models.base import BaseDataGroup, BaseDataModel +from igelfs.models.base import BaseDataGroup, BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection @@ -17,33 +16,34 @@ class PartitionHeader(BaseDataModel): n_blocks / (2 ** cluster_shift) == n_clusters """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "type": 2, - "hdrlen": 2, - "partlen": 8, - "n_blocks": 8, - "offset_blocktable": 8, - "offset_blocks": 8, - "n_clusters": 4, - "cluster_shift": 2, - "n_extents": 2, - "name": 16, - "update_hash": 64, - } - DEFAULT_VALUES = {"hdrlen": 124} - - type: int # partition type - hdrlen: int # length of the complete partition header (incl. extents) - partlen: int # length of this partition (incl. header) - n_blocks: int # number of uncompressed 1k blocks - offset_blocktable: int # needed for compressed partitions - offset_blocks: int # start of the compressed block clusters - n_clusters: int # number of clusters - cluster_shift: int # 2^x blocks make up a cluster - n_extents: int # number of extents, if any - name: bytes # optional character code (for pdir) + type: int = field(metadata=DataModelMetadata(size=2)) # partition type + hdrlen: int = field( # length of the complete partition header (incl. extents) + metadata=DataModelMetadata(size=2, default=124) + ) + partlen: int = field( # length of this partition (incl. header) + metadata=DataModelMetadata(size=8) + ) + n_blocks: int = field( # number of uncompressed 1k blocks + metadata=DataModelMetadata(size=8) + ) + offset_blocktable: int = field( # needed for compressed partitions + metadata=DataModelMetadata(size=8) + ) + offset_blocks: int = field( # start of the compressed block clusters + metadata=DataModelMetadata(size=8) + ) + n_clusters: int = field(metadata=DataModelMetadata(size=4)) # number of clusters + cluster_shift: int = field( # 2^x blocks make up a cluster + metadata=DataModelMetadata(size=2) + ) + n_extents: int = field( # number of extents, if any + metadata=DataModelMetadata(size=2) + ) + name: bytes = field( # optional character code (for pdir) + metadata=DataModelMetadata(size=16) + ) # A high level hash over almost all files, used to determine if an update is needed - update_hash: bytes + update_hash: bytes = field(metadata=DataModelMetadata(size=64)) def __post_init__(self) -> None: """Handle model-specific data post-initialisation.""" @@ -58,7 +58,7 @@ def __post_init__(self) -> None: def get_type(self) -> PartitionType: """Return PartitionType from PartitionHeader instance.""" type_ = int.from_bytes( - self.type.to_bytes(self.MODEL_ATTRIBUTE_SIZES["type"], byteorder="big"), + self.type.to_bytes(self.get_attribute_size("type"), byteorder="big"), byteorder="little", ) return PartitionType(type_ & 0xFF) @@ -72,17 +72,14 @@ def get_name(self) -> str | None: class PartitionExtent(BaseDataModel): """Dataclass to handle partition extent data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "type": 2, - "offset": 8, - "length": 8, - "name": 8, - } - - type: int # type of extent -> ExtentType - offset: int # offset from start of partition header - length: int # size of data in bytes - name: bytes # optional character code + type: int = field( # type of extent -> ExtentType + metadata=DataModelMetadata(size=2) + ) + offset: int = field( # offset from start of partition header + metadata=DataModelMetadata(size=8) + ) + length: int = field(metadata=DataModelMetadata(size=8)) # size of data in bytes + name: bytes = field(metadata=DataModelMetadata(size=8)) # optional character code def get_type(self) -> ExtentType: """Return ExtentType from PartitionExtent instance.""" @@ -97,30 +94,30 @@ def get_name(self) -> str: class PartitionExtents(BaseDataModel): """Dataclass to handle partition extents.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "n_extents": 2, - "extent": MAX_EXTENT_NUM * PartitionExtent.get_model_size(), - } - - n_extents: int - extent: DataModelCollection[PartitionExtent] + n_extents: int = field(metadata=DataModelMetadata(size=2)) + extent: DataModelCollection[PartitionExtent] = field( + metadata=DataModelMetadata( + size=MAX_EXTENT_NUM * PartitionExtent.get_model_size() + ) + ) @dataclass class PartitionExtentReadWrite(BaseDataModel): """Dataclass to handle partition extent read/write data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "ext_num": 1, - "pos": 8, - "size": 8, - "data": 1, - } - - ext_num: int # extent number where to read from - pos: int # position inside extent to start reading from - size: int # size of data (WARNING limited to EXTENT_MAX_READ_WRITE_SIZE) - data: int # destination/src pointer for the data to + ext_num: int = field( # extent number where to read from + metadata=DataModelMetadata(size=1) + ) + pos: int = field( # position inside extent to start reading from + metadata=DataModelMetadata(size=8) + ) + size: int = field( # size of data (WARNING limited to EXTENT_MAX_READ_WRITE_SIZE) + metadata=DataModelMetadata(size=8) + ) + data: int = field( # destination/src pointer for the data to + metadata=DataModelMetadata(size=1) + ) @dataclass diff --git a/igelfs/models/section.py b/igelfs/models/section.py index 5947c3d..6b61a23 100644 --- a/igelfs/models/section.py +++ b/igelfs/models/section.py @@ -3,7 +3,7 @@ import copy import io from dataclasses import dataclass, field -from typing import Any, ClassVar +from typing import Any import magic @@ -13,7 +13,7 @@ IGF_SECT_HDR_MAGIC, SECTION_IMAGE_CRC_START, ) -from igelfs.models.base import BaseDataModel +from igelfs.models.base import BaseDataModel, DataModelMetadata from igelfs.models.collections import DataModelCollection from igelfs.models.hash import Hash, HashExclude, HashHeader from igelfs.models.mixins import CRCMixin @@ -25,28 +25,31 @@ class SectionHeader(BaseDataModel): """Dataclass to handle section header data.""" - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "crc": 4, - "magic": 4, - "section_type": 2, - "section_size": 2, - "partition_minor": 4, - "generation": 2, - "section_in_minor": 4, - "next_section": 4, - "reserved": 6, - } - DEFAULT_VALUES = {"magic": IGF_SECT_HDR_MAGIC[-1]} - - crc: int # crc of the rest of the section - magic: int # magic number (erase count long ago) - section_type: int - section_size: int # log2((section size in bytes) / 65536) - partition_minor: int # partition number (driver minor number) - generation: int # update generation count - section_in_minor: int # n = 0,...,(number of sect.-1) - next_section: int # index of the next section or 0xffffffff = end of chain - reserved: bytes # section header is 32 bytes but 6 bytes are unused + crc: int = field( # crc of the rest of the section + metadata=DataModelMetadata(size=4) + ) + magic: int = field( # magic number (erase count long ago) + metadata=DataModelMetadata(size=4, default=IGF_SECT_HDR_MAGIC[-1]) + ) + section_type: int = field(metadata=DataModelMetadata(size=2)) + section_size: int = field( # log2((section size in bytes) / 65536) + metadata=DataModelMetadata(size=2) + ) + partition_minor: int = field( # partition number (driver minor number) + metadata=DataModelMetadata(size=4) + ) + generation: int = field( # update generation count + metadata=DataModelMetadata(size=2) + ) + section_in_minor: int = field( # n = 0,...,(number of sect.-1) + metadata=DataModelMetadata(size=4) + ) + next_section: int = field( # index of the next section or 0xffffffff = end of chain + metadata=DataModelMetadata(size=4) + ) + reserved: bytes = field( # section header is 32 bytes but 6 bytes are unused + metadata=DataModelMetadata(size=6) + ) def __post_init__(self) -> None: """Verify magic number on initialisation.""" @@ -63,17 +66,14 @@ class Section(BaseDataModel, CRCMixin): post-initialisation to add these attributes. """ - MODEL_ATTRIBUTE_SIZES: ClassVar[dict[str, int]] = { - "header": IGF_SECT_HDR_LEN, - "data": IGF_SECT_DATA_LEN, - } - DEFAULT_VALUES = {"header": SectionHeader.new()} CRC_OFFSET = SECTION_IMAGE_CRC_START - header: SectionHeader + header: SectionHeader = field( + metadata=DataModelMetadata(size=IGF_SECT_HDR_LEN, default=SectionHeader.new()) + ) partition: Partition | None = field(init=False) hash: Hash | None = field(init=False) - data: bytes + data: bytes = field(metadata=DataModelMetadata(size=IGF_SECT_DATA_LEN)) def __post_init__(self) -> None: """Parse data into optional additional attributes."""