Skip to content

Commit

Permalink
added compression level choices -c, --compression (default, fast & hc…
Browse files Browse the repository at this point in the history
…), heavily optimized code base and minor improvements added for file processing
  • Loading branch information
rifsxd committed Mar 14, 2024
1 parent d6e77ac commit 169e079
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 68 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ Usage :

$ pydvpl --mode compress --path /path/to/decompress/compress.yaml --threads 10

$ pydvpl --mode compress --path /path/to/decompress/compress.yaml --compression hc
$ pydvpl --mode compress --path /path/to/decompress/ --compression fast

Requirements :

>python 3.10+
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "pydvpl"
description = "A CLI Tool Coded In Python3 To Convert WoTB ( Dava ) SmartDLC DVPL File Based On LZ4 High Compression."
readme = "README.md"
version = "0.4.0"
version = "0.5.0"
authors = [{ name = "RifsxD", email = "support@rxd-mods.xyz" }]
license = { text = "MIT License" }
requires-python = ">=3.10"
Expand Down
16 changes: 8 additions & 8 deletions src/pydvpl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .converter import (
CompressDVPL,
DecompressDVPL,
ConvertDVPLFiles,
VerifyDVPLFiles,
readDVPLFooter,
createDVPLFooter
compress_dvpl,
decompress_dvpl,
convert_dvpl,
verify_dvpl,
read_dvpl_footer,
create_dvpl_footer
)

from .__version__ import (
Expand All @@ -16,8 +16,8 @@
__author__
)

__all__ = ['CompressDVPL', 'DecompressDVPL', 'ConvertDVPLFiles',
'VerifyDVPLFiles', 'readDVPLFooter', 'createDVPLFooter',
__all__ = ['compress_dvpl', 'decompress_dvpl', 'convert_dvpl',
'verify_dvpl', 'read_dvpl_footer', 'create_dvpl_footer',
'__description__', '__title__', '__version__',
'__author__', '__date__', '__repo__']

2 changes: 1 addition & 1 deletion src/pydvpl/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "PyDVPL"
__description__ = "A CLI Tool Coded In Python3 To Convert WoTB ( Dava ) SmartDLC DVPL File Based On LZ4 High Compression."
__version__ = "0.4.0"
__version__ = "0.5.0"
__date__ = "2024-03-14"
__author__ = "RifsxD"
__repo__ = "https://github.com/rifsxd/pydvpl"
143 changes: 85 additions & 58 deletions src/pydvpl/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import sys
import multiprocessing
import queue
from pathlib import Path
from functools import partial
from .__version__ import __version__, __description__, __title__, __date__, __repo__, __author__



class Color:
RED = '\033[31m'
GREEN = '\033[32m'
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(self, original_size, compressed_size, crc32, type_val):
self.type = type_val


def createDVPLFooter(input_size, compressed_size, crc32_val, type_val):
def create_dvpl_footer(input_size, compressed_size, crc32_val, type_val):
result = bytearray(DVPL_FOOTER_SIZE)
result[:4] = input_size.to_bytes(4, 'little')
result[4:8] = compressed_size.to_bytes(4, 'little')
Expand All @@ -58,7 +58,7 @@ def createDVPLFooter(input_size, compressed_size, crc32_val, type_val):
return result


def readDVPLFooter(buffer):
def read_dvpl_footer(buffer):
if len(buffer) < DVPL_FOOTER_SIZE:
raise ValueError(Color.RED + "InvalidDVPLFooter: Buffer size is smaller than expected" + Color.RESET)

Expand All @@ -75,14 +75,21 @@ def readDVPLFooter(buffer):
return DVPLFooter(original_size, compressed_size, crc32_val, type_val)


def CompressDVPL(buffer):
compressed_block = lz4.block.compress(buffer, store_size=False)
footer_buffer = createDVPLFooter(len(buffer), len(compressed_block), zlib.crc32(compressed_block), DVPL_TYPE_LZ4)
def compress_dvpl(buffer, compression_type="default"):
if compression_type == "fast":
mode = "fast"
elif compression_type == "hc":
mode = "high_compression"
else:
mode = "default"

compressed_block = lz4.block.compress(buffer, store_size=False, mode=mode)
footer_buffer = create_dvpl_footer(len(buffer), len(compressed_block), zlib.crc32(compressed_block), DVPL_TYPE_LZ4)
return compressed_block + footer_buffer


def DecompressDVPL(buffer):
footer_data = readDVPLFooter(buffer)
def decompress_dvpl(buffer):
footer_data = read_dvpl_footer(buffer)
target_block = buffer[:-DVPL_FOOTER_SIZE]

if len(target_block) != footer_data.compressed_size:
Expand All @@ -96,10 +103,10 @@ def DecompressDVPL(buffer):
raise ValueError(Color.RED + "DVPLTypeSizeMismatch" + Color.RESET)
return target_block
elif footer_data.type == DVPL_TYPE_LZ4:
deDVPL_block = lz4.block.decompress(target_block, uncompressed_size=footer_data.original_size)
if len(deDVPL_block) != footer_data.original_size:
de_dvpl_block = lz4.block.decompress(target_block, uncompressed_size=footer_data.original_size)
if len(de_dvpl_block) != footer_data.original_size:
raise ValueError(Color.RED + "DVPLDecodeSizeMismatch" + Color.RESET)
return deDVPL_block
return de_dvpl_block
else:
raise ValueError(Color.RED + "UNKNOWN DVPL FORMAT" + Color.RESET)

Expand All @@ -115,16 +122,15 @@ def print_progress_bar(processed_files, total_files):
sys.stdout.flush()


from pathlib import Path

def count_total_files(directory):
total_files = 0
for path in Path(directory).rglob('*'):
if path.is_file():
total_files += 1
return total_files

def ConvertDVPLFiles(directory_or_file, config, total_files=None, processed_files=None):

def convert_dvpl(directory_or_file, config, total_files=None, processed_files=None):
if total_files is None:
total_files = count_total_files(directory_or_file)
if processed_files is None:
Expand All @@ -134,10 +140,13 @@ def ConvertDVPLFiles(directory_or_file, config, total_files=None, processed_file
failure_count = 0
ignored_count = 0

if not os.path.exists(directory_or_file):
raise FileNotFoundError(f"File or directory '{directory_or_file}' not found.")

if Path(directory_or_file).is_dir():
for file_path in Path(directory_or_file).rglob('*'):
if file_path.is_file():
succ, fail, ignored = ConvertDVPLFiles(str(file_path), config, total_files, processed_files)
succ, fail, ignored = convert_dvpl(str(file_path), config, total_files, processed_files)
success_count += succ
failure_count += fail
ignored_count += ignored
Expand All @@ -154,26 +163,38 @@ def ConvertDVPLFiles(directory_or_file, config, total_files=None, processed_file
if not should_ignore and (is_decompression or is_compression):
file_path = directory_or_file
try:
with open(file_path, "rb") as f:
file_data = f.read()

if is_compression:
processed_block = CompressDVPL(file_data)
new_name = file_path + ".dvpl"
# Check if the file exists before attempting to open it
if os.path.exists(file_path):
with open(file_path, "rb") as f:
file_data = f.read()

if is_compression:
if config.compression == "fast":
processed_block = compress_dvpl(file_data, "fast")
elif config.compression == "hc":
processed_block = compress_dvpl(file_data, "hc")
else:
processed_block = compress_dvpl(file_data)
new_name = file_path + ".dvpl"
else:
processed_block = decompress_dvpl(file_data)
new_name = os.path.splitext(file_path)[0]

with open(new_name, "wb") as f:
f.write(processed_block)

if not config.keep_originals:
os.remove(file_path)

success_count += 1
if config.verbose:
with output_lock:
print(f"{Color.GREEN}\nFile{Color.RESET} {file_path} has been successfully {Color.GREEN}{'compressed' if is_compression else 'decompressed'}{Color.RESET} into {Color.GREEN}{new_name}{Color.RESET}")
else:
processed_block = DecompressDVPL(file_data)
new_name = os.path.splitext(file_path)[0]

with open(new_name, "wb") as f:
f.write(processed_block)

if not config.keep_originals:
os.remove(file_path)

success_count += 1
if config.verbose:
with output_lock:
print(f"{Color.GREEN}\nFile{Color.RESET} {file_path} has been successfully {Color.GREEN}{'compressed' if is_compression else 'decompressed'}{Color.RESET} into {Color.GREEN}{new_name}{Color.RESET}")
if config.verbose:
with output_lock:
print(f"{Color.RED}\nError{Color.RESET}: File {file_path} does not exist.")
failure_count += 1
except Exception as e:
failure_count += 1
if config.verbose:
Expand All @@ -187,7 +208,8 @@ def ConvertDVPLFiles(directory_or_file, config, total_files=None, processed_file

return success_count, failure_count, ignored_count

def VerifyDVPLFiles(directory_or_file, config, total_files=None, processed_files=None):

def verify_dvpl(directory_or_file, config, total_files=None, processed_files=None):
if total_files is None:
total_files = count_total_files(directory_or_file)
if processed_files is None:
Expand All @@ -197,10 +219,13 @@ def VerifyDVPLFiles(directory_or_file, config, total_files=None, processed_files
failure_count = 0
ignored_count = 0

if not os.path.exists(directory_or_file):
raise FileNotFoundError(f"File or directory '{directory_or_file}' not found.")

if Path(directory_or_file).is_dir():
for file_path in Path(directory_or_file).rglob('*'):
if file_path.is_file() and file_path.suffix == '.dvpl':
succ, fail, ignored = VerifyDVPLFiles(str(file_path), config, total_files, processed_files)
succ, fail, ignored = verify_dvpl(str(file_path), config, total_files, processed_files)
success_count += succ
failure_count += fail
ignored_count += ignored
Expand All @@ -219,7 +244,7 @@ def VerifyDVPLFiles(directory_or_file, config, total_files=None, processed_files
with open(file_path, "rb") as f:
file_data = f.read()

footer_data = readDVPLFooter(file_data)
footer_data = read_dvpl_footer(file_data)

target_block = file_data[:-DVPL_FOOTER_SIZE]

Expand All @@ -233,8 +258,8 @@ def VerifyDVPLFiles(directory_or_file, config, total_files=None, processed_files
if footer_data.original_size != footer_data.compressed_size or footer_data.type != DVPL_TYPE_NONE:
raise ValueError(Color.RED + "DVPLTypeSizeMismatch" + Color.RESET)
elif footer_data.type == DVPL_TYPE_LZ4:
deDVPL_block = lz4.block.decompress(target_block, uncompressed_size=footer_data.original_size)
if len(deDVPL_block) != footer_data.original_size:
de_dvpl_block = lz4.block.decompress(target_block, uncompressed_size=footer_data.original_size)
if len(de_dvpl_block) != footer_data.original_size:
raise ValueError(Color.RED + "DVPLDecodeSizeMismatch" + Color.RESET)
else:
raise ValueError(Color.RED + "UNKNOWN DVPL FORMAT" + Color.RESET)
Expand All @@ -259,19 +284,20 @@ def VerifyDVPLFiles(directory_or_file, config, total_files=None, processed_files
return success_count, failure_count, ignored_count


def process_func(directory_or_file, config, total_files, processed_files):

def process_mode(directory_or_file, config):
if config.mode in ["compress", "decompress"]:
return ConvertDVPLFiles(directory_or_file, config)
return convert_dvpl(directory_or_file, config)
elif config.mode == "verify":
return VerifyDVPLFiles(directory_or_file, config)
return verify_dvpl(directory_or_file, config)
elif config.mode == "help":
PrintHelpMessage()
return (0, 0, 0) # Return (0, 0, 0) when in help mode
print_help_message()
return 0, 0, 0
else:
raise ValueError("Incorrect mode selected. Use '--help' for information.")


def ParseCommandLineArgs():
def parse_command_line_args():
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mode",
help="mode can be 'c' or 'compress' / 'd' or 'decompress' / 'v' or 'verify' / 'h' or 'help' (for an extended help guide).")
Expand All @@ -284,6 +310,8 @@ def ParseCommandLineArgs():
help="Comma-separated list of file extensions to ignore during compression.")
parser.add_argument("-t", "--threads", type=int, default=1,
help="Number of threads to use for processing. Default is 1.")
parser.add_argument("-c", "--compression", choices=['default', 'fast', 'hc'],
help="Select compression level: 'default' for default compression, 'fast' for fast compression, 'hc' for high compression.")
args = parser.parse_args()

if not args.mode:
Expand All @@ -300,14 +328,14 @@ def ParseCommandLineArgs():
'h': 'help'
}

# If mode argument is provided and it matches a short form, replace it with the full mode name
# If mode argument is provided, and it matches a short form, replace it with the full mode name
if args.mode in mode_mapping:
args.mode = mode_mapping[args.mode]

return args


def PrintHelpMessage():
def print_help_message():
print('''$ pydvpl [--mode] [--keep-originals] [--path] [--verbose] [--ignore] [--threads]
• flags can be one of the following:
Expand Down Expand Up @@ -344,7 +372,7 @@ def PrintHelpMessage():
$ pydvpl --mode decompress --keep-originals --path /path/to/decompress/compress.yaml.dvpl
$ pydvpl --mode dcompress --keep-originals --path /path/to/decompress/compress.yaml
$ pydvpl --mode decompress --keep-originals --path /path/to/decompress/compress.yaml
$ pydvpl --mode compress --path /path/to/decompress --ignore .exe,.dll
Expand All @@ -359,10 +387,14 @@ def PrintHelpMessage():
$ pydvpl --mode decompress --path /path/to/decompress/compress.yaml.dvpl --threads 10
$ pydvpl --mode compress --path /path/to/decompress/compress.yaml --threads 10
$ pydvpl --mode compress --path /path/to/decompress/compress.yaml --compression hc
$ pydvpl --mode compress --path /path/to/decompress/ --compression fast
''')


def PrintElapsedTime(elapsed_time):
def print_elapsed_time(elapsed_time):
if elapsed_time < 1:
print(f"\nProcessing took {Color.GREEN}{int(elapsed_time * 1000)} ms{Color.RESET}\n")
elif elapsed_time < 60:
Expand All @@ -380,19 +412,14 @@ def main():
print(f"{Color.BLUE}• Info:{Color.RESET} {Meta.INFO}\n")

start_time = time.time()
config = ParseCommandLineArgs()
config = parse_command_line_args()

if config.threads <= 0:
print(f"\n{Color.YELLOW}No threads specified.{Color.RESET} No processing will be done.\n")
return

total_files = count_total_files(config.path)
manager = multiprocessing.Manager()
processed_files = manager.Value('i', 0) # Define processed_files using a Manager

try:
process_func_partial = partial(process_func, config=config, total_files=total_files,
processed_files=processed_files)
process_func_partial = partial(process_mode, config=config)

if config.threads > 1:
with multiprocessing.Pool(config.threads) as pool:
Expand All @@ -414,8 +441,8 @@ def main():
print(f"\n\n{Color.RED}{config.mode.upper()} FAILED{Color.RESET}: {e}\n")

elapsed_time = time.time() - start_time
PrintElapsedTime(elapsed_time)
print_elapsed_time(elapsed_time)


if __name__ == "__main__":
main()
main()

0 comments on commit 169e079

Please sign in to comment.