Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions UnityPy/files/BundleFile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# TODO: implement encryption for saving files
import re
from collections import namedtuple
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Union, cast

from .. import config
from ..enums import ArchiveFlags, ArchiveFlagsOld, CompressionFlags
Expand Down Expand Up @@ -351,12 +351,12 @@ def save_fs(self, writer: EndianBinaryWriter, data_flag: int, block_info_flag: i
uncompressed_block_data_size = len(block_data)

switch = data_flag & 0x3F
if switch == 1: # LZMA
block_data = CompressionHelper.compress_lzma(block_data)
elif switch in [2, 3]: # LZ4, LZ4HC
block_data = CompressionHelper.compress_lz4(block_data)
elif switch == 4: # LZHAM
raise NotImplementedError
if switch in CompressionHelper.COMPRESSION_MAP:
block_data = CompressionHelper.COMPRESSION_MAP[switch](block_data)
else:
raise NotImplementedError(
f"No compression function in the CompressionHelper.COMPRESSION_MAP for {switch}"
)

compressed_block_data_size = len(block_data)

Expand Down Expand Up @@ -525,16 +525,20 @@ def decompress_data(
The decompressed data."""
comp_flag = CompressionFlags(flags & ArchiveFlags.CompressionTypeMask)

if comp_flag == CompressionFlags.LZMA: # LZMA
return CompressionHelper.decompress_lzma(compressed_data)
elif comp_flag in [CompressionFlags.LZ4, CompressionFlags.LZ4HC]: # LZ4, LZ4HC
if self.decryptor is not None and flags & 0x100:
compressed_data = self.decryptor.decrypt_block(compressed_data, index)
return CompressionHelper.decompress_lz4(compressed_data, uncompressed_size)
elif comp_flag == CompressionFlags.LZHAM: # LZHAM
raise NotImplementedError("LZHAM decompression not implemented")
if self.decryptor is not None and flags & 0x100:
compressed_data = self.decryptor.decrypt_block(compressed_data, index)

if comp_flag in CompressionHelper.DECOMPRESSION_MAP:
return cast(
bytes,
CompressionHelper.DECOMPRESSION_MAP[comp_flag](
compressed_data, uncompressed_size
),
)
else:
return compressed_data
raise ValueError(

This comment was marked as outdated.

f"Unknown compression! flag: {flags}, compression flag: {comp_flag.value}"
)

def get_version_tuple(self) -> Tuple[int, int, int]:
"""Returns the version as a tuple."""
Expand Down
167 changes: 120 additions & 47 deletions UnityPy/helpers/CompressionHelper.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import gzip
import lzma
import struct
from typing import Tuple
from typing import Tuple, Union, Callable, Dict

import brotli
import lz4.block

from ..enums.BundleFile import CompressionFlags


ByteString = Union[bytes, bytearray, memoryview]
GZIP_MAGIC: bytes = b"\x1f\x8b"
BROTLI_MAGIC: bytes = b"brotli"


# LZMA
def decompress_lzma(data: bytes, read_decompressed_size: bool = False) -> bytes:
def decompress_lzma(data: ByteString, read_decompressed_size: bool = False) -> bytes:
"""decompresses lzma-compressed data

:param data: compressed data
:type data: bytes
:type data: ByteString
:raises _lzma.LZMAError: Compressed data ended before the end-of-stream marker was reached
:return: uncompressed data
:rtype: bytes
Expand All @@ -41,13 +45,13 @@ def decompress_lzma(data: bytes, read_decompressed_size: bool = False) -> bytes:
return dec.decompress(data[data_offset:])


def compress_lzma(data: bytes, write_decompressed_size: bool = False) -> bytes:
def compress_lzma(data: ByteString, write_decompressed_size: bool = False) -> bytes:
"""compresses data via lzma (unity specific)
The current static settings may not be the best solution,
but they are the most commonly used values and should therefore be enough for the time being.

:param data: uncompressed data
:type data: bytes
:type data: ByteString
:return: compressed data
:rtype: bytes
"""
Expand Down Expand Up @@ -77,11 +81,11 @@ def compress_lzma(data: bytes, write_decompressed_size: bool = False) -> bytes:


# LZ4
def decompress_lz4(data: bytes, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
def decompress_lz4(data: ByteString, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
"""decompresses lz4-compressed data

:param data: compressed data
:type data: bytes
:type data: ByteString
:param uncompressed_size: size of the uncompressed data
:type uncompressed_size: int
:raises _block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer.
Expand All @@ -91,11 +95,11 @@ def decompress_lz4(data: bytes, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
return lz4.block.decompress(data, uncompressed_size)


def compress_lz4(data: bytes) -> bytes: # LZ4M/LZ4HC
def compress_lz4(data: ByteString) -> bytes: # LZ4M/LZ4HC
"""compresses data via lz4.block

:param data: uncompressed data
:type data: bytes
:type data: ByteString
:return: compressed data
:rtype: bytes
"""
Expand All @@ -105,77 +109,88 @@ def compress_lz4(data: bytes) -> bytes: # LZ4M/LZ4HC


# Brotli
def decompress_brotli(data: bytes) -> bytes:
def decompress_brotli(data: ByteString) -> bytes:
"""decompresses brotli-compressed data

:param data: compressed data
:type data: bytes
:type data: ByteString
:raises brotli.error: BrotliDecompress failed
:return: uncompressed data
:rtype: bytes
"""
return brotli.decompress(data)


def compress_brotli(data: bytes) -> bytes:
def compress_brotli(data: ByteString) -> bytes:
"""compresses data via brotli

:param data: uncompressed data
:type data: bytes
:type data: ByteString
:return: compressed data
:rtype: bytes
"""
return brotli.compress(data)


# GZIP
def decompress_gzip(data: bytes) -> bytes:
def decompress_gzip(data: ByteString) -> bytes:
"""decompresses gzip-compressed data

:param data: compressed data
:type data: bytes
:type data: ByteString
:raises OSError: Not a gzipped file
:return: uncompressed data
:rtype: bytes
"""
return gzip.decompress(data)


def compress_gzip(data: bytes) -> bytes:
def compress_gzip(data: ByteString) -> bytes:
"""compresses data via gzip
The current static settings may not be the best solution,
but they are the most commonly used values and should therefore be enough for the time being.

:param data: uncompressed data
:type data: bytes
:type data: ByteString
:return: compressed data
:rtype: bytes
"""
return gzip.compress(data)


def chunk_based_compress(data: bytes, block_info_flag: int) -> Tuple[bytes, list]:
def chunk_based_compress(
data: ByteString, block_info_flag: int
) -> Tuple[ByteString, list]:
"""compresses AssetBundle data based on the block_info_flag
LZ4/LZ4HC will be chunk-based compression

:param data: uncompressed data
:type data: bytes
:type data: ByteString
:param block_info_flag: block info flag
:type block_info_flag: int
:return: compressed data and block info
:rtype: tuple
"""
switch = block_info_flag & 0x3F
chunk_size = None
compress_func = None
if switch == 0: # NONE
return data, [(len(data), len(data), block_info_flag)]
elif switch == 1: # LZMA
chunk_size = 0xFFFFFFFF
compress_func = compress_lzma
elif switch in [2, 3]: # LZ4
chunk_size = 0x00020000
compress_func = compress_lz4
elif switch == 4: # LZHAM
raise NotImplementedError

if switch in COMPRESSION_MAP:
compress_func = COMPRESSION_MAP[switch]
else:
raise NotImplementedError(
f"No compression function in the CompressionHelper.COMPRESSION_MAP for {switch}"
)

if switch in COMPRESSION_CHUNK_SIZE_MAP:
chunk_size = COMPRESSION_CHUNK_SIZE_MAP[switch]
else:
raise NotImplementedError(
f"No chunk size in the CompressionHelper.COMPRESSION_CHUNK_SIZE_MAP for {switch}"
)

block_info = []
uncompressed_data_size = len(data)
compressed_file_data = bytearray()
Expand All @@ -184,34 +199,92 @@ def chunk_based_compress(data: bytes, block_info_flag: int) -> Tuple[bytes, list
compressed_data = compress_func(data[p : p + chunk_size])
if len(compressed_data) > chunk_size:
compressed_file_data.extend(data[p : p + chunk_size])
block_info.append((
chunk_size,
chunk_size,
block_info_flag ^ switch,
))
block_info.append(
(
chunk_size,
chunk_size,
block_info_flag ^ switch,
)
)
else:
compressed_file_data.extend(compressed_data)
block_info.append((
chunk_size,
len(compressed_data),
block_info_flag,
))
block_info.append(
(
chunk_size,
len(compressed_data),
block_info_flag,
)
)
p += chunk_size
uncompressed_data_size -= chunk_size
if uncompressed_data_size > 0:
compressed_data = compress_func(data[p:])
if len(compressed_data) > uncompressed_data_size:
compressed_file_data.extend(data[p:])
block_info.append((
uncompressed_data_size,
uncompressed_data_size,
block_info_flag ^ switch,
))
block_info.append(
(
uncompressed_data_size,
uncompressed_data_size,
block_info_flag ^ switch,
)
)
else:
compressed_file_data.extend(compressed_data)
block_info.append((
uncompressed_data_size,
len(compressed_data),
block_info_flag,
))
block_info.append(
(
uncompressed_data_size,
len(compressed_data),
block_info_flag,
)
)
return bytes(compressed_file_data), block_info


def decompress_lzham(data: ByteString, uncompressed_size: int) -> bytes:
raise NotImplementedError(
"Custom compression or unimplemented LZHAM (removed by Unity) encountered!"
)


DECOMPRESSION_MAP: Dict[
Union[int, CompressionFlags], Callable[[ByteString, int], ByteString]
] = {
CompressionFlags.NONE: lambda cd, _ucs: cd,
CompressionFlags.LZMA: lambda cd, _ucs: decompress_lzma(cd),
CompressionFlags.LZ4: decompress_lz4,
CompressionFlags.LZ4HC: decompress_lz4,
CompressionFlags.LZHAM: decompress_lzham,
}

COMPRESSION_MAP: Dict[
Union[int, CompressionFlags], Callable[[ByteString], ByteString]
] = {
CompressionFlags.NONE: lambda cd: cd,
CompressionFlags.LZMA: compress_lzma,
CompressionFlags.LZ4: compress_lz4,
CompressionFlags.LZ4HC: compress_lz4,
}

COMPRESSION_CHUNK_SIZE_MAP: Dict[Union[int, CompressionFlags], int] = {
CompressionFlags.NONE: 0xFFFFFFFF,
CompressionFlags.LZMA: 0xFFFFFFFF,
CompressionFlags.LZ4: 0x00020000,
CompressionFlags.LZ4HC: 0x00020000,
}


__all__ = (
"compress_brotli",
"compress_gzip",
"compress_lz4",
"compress_lzma",
"decompress_brotli",
"decompress_gzip",
"decompress_lz4",
"decompress_lzma",
"decompress_lzham",
"chunk_based_compress",
"COMPRESSION_MAP",
"DECOMPRESSION_MAP",
"COMPRESSION_CHUNK_SIZE_MAP",
)