commit ba01d5de1661885045e93728913c019b9449927a Author: Sergey Elpashev Date: Fri Jun 28 13:17:45 2024 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..19c8cd5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +output +*.npk +venv \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..e34fb1e --- /dev/null +++ b/main.py @@ -0,0 +1,315 @@ +import hashlib +import io +import os +import struct +import sys + +import zstandard +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad + + +class StructStream: + def __init__(self, big_endian=False, encoding=None): + self.big_endian = big_endian + self.encoding = encoding or 'utf-8' + + def read_struct(self, stream, struct_fmt): + fmt = struct_fmt + if self.big_endian: + fmt = '>' + fmt + size = struct.calcsize(fmt) + data = stream.read(size) + return struct.unpack(fmt, data)[0] + + def write_struct(self, stream, struct_fmt, value): + fmt = struct_fmt + if self.big_endian: + fmt = '>' + fmt + data = struct.pack(fmt, value) + stream.write(data) + + def read_string(self, stream, length=None, encoding=None): + encoding = encoding or self.encoding + if length is None: + data = bytearray() + while True: + byte = stream.read(1) + if byte == b'\x00': + break + data += byte + return data.decode(encoding) + else: + data = stream.read(length) + return data.decode(encoding, errors='ignore').rstrip('\x00') + + def write_string(self, stream, value, encoding=None): + encoding = encoding or self.encoding + data = value.encode(encoding) + stream.write(data) + +class NPK3Tool: + + def __init__(self, key, iv=None): + self.key = key + self.iv = iv or b'\x42\x79\x20\x4D\x61\x72\x63\x75\x73\x73\x61\x63\x61\x6e\x61\x00' + self.enable_compression = True + self.enable_segmentation = True + self.force_segmentation = False + self.npk_version = 3 + self.npk_minor_version = 1 + self.max_section_size = 0x10000 + self.encoding = 'utf-8' + self.dont_compress = [".png", ".ogg", ".jpg", ".mpg"] + self.struct_stream = StructStream(encoding=self.encoding) + + def unpack(self, package_path, output_dir=None): + if output_dir is None: + output_dir = os.path.splitext(package_path)[0] + "~" + os.makedirs(output_dir, exist_ok=True) + + with open(package_path, 'rb') as f: + magic = f.read(4) + print(magic) + if magic == b'NPK3': + self.npk_version = 3 + elif magic == b'NPK2': + self.npk_version = 2 + else: + raise ValueError("Unsupported NPK version: {}".format(magic)) + + f.seek(8) + self.iv = f.read(16) + print(self.iv) + entry_table = self.get_entry_table(f) + entries = self.get_entries(entry_table) + + for entry in entries: + print(f"Extracting File: {entry['FilePath']}") + output_path = os.path.join(output_dir, entry['FilePath'].replace('/', os.path.sep)) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, 'wb') as outfile: + for segment in entry['SegmentsInfo']: + offset = segment['Offset'] + aligned_size = segment['AlignedSize'] + f.seek(offset) + data = f.read(aligned_size) + decrypted_data = self.decrypt(data) + if segment['RealSize'] < segment['DecompressedSize']: + decompressed_data = self.decompress(decrypted_data) + outfile.write(decompressed_data) + else: + outfile.write(decrypted_data) + + def repack(self, input_dir, output_path=None): + if output_path is None: + output_path = os.path.splitext(input_dir)[0] + "_New.npk" + input_dir = os.path.abspath(input_dir) + + file_paths = [] + relative_files = [] + for root, _, files in os.walk(input_dir): + for file in files: + file_path = os.path.join(root, file) + file_paths.append(file_path) + relative_files.append(os.path.relpath(file_path, input_dir).replace(os.path.sep, '/')) + + entries = self.create_initial_entries(relative_files, file_paths) + + with open(output_path, 'wb') as f: + f.write(b'NPK3' if self.npk_version == 3 else b'NPK2') + f.write(struct.pack('I', self.npk_minor_version)) + f.write(self.iv) + f.write(struct.pack('I', len(file_paths))) + + table_data = self.build_entries(entries) + encrypted_table = self.encrypt(table_data) + f.write(struct.pack('I', len(encrypted_table))) + + table_pos = f.tell() + f.seek(table_pos + len(encrypted_table)) + + for i, entry in enumerate(entries): + print(f"Writing File: {entry['FilePath']}") + file_path = file_paths[i] + ext = os.path.splitext(file_path).lower() + compress = self.enable_compression and ext not in self.dont_compress + + read_pos = 0 + for x, segment in enumerate(entry['SegmentsInfo']): + with open(file_path, 'rb') as infile: + infile.seek(read_pos) + segment_data = infile.read(segment['DecompressedSize']) + read_pos += len(segment_data) + if compress: + compressed_data = self.compress(segment_data) + if len(compressed_data) < len(segment_data): + segment_data = compressed_data + encrypted_data = self.encrypt(segment_data) + entry['SegmentsInfo'][x]['RealSize'] = len(segment_data) + entry['SegmentsInfo'][x]['AlignedSize'] = len(encrypted_data) + entry['SegmentsInfo'][x]['Offset'] = f.tell() + f.write(encrypted_data) + + f.seek(table_pos) + print("Writing File Index...") + f.write(encrypted_table) + + def get_entries(self, entry_table): + entries = [] + with io.BytesIO(entry_table) as f: + while f.tell() < len(entry_table): + entry = {} + entry['SegmentationMode'] = self.struct_stream.read_struct(f, 'B') + entry['FileNameSize'] = self.struct_stream.read_struct(f, 'H') + entry['FilePath'] = self.struct_stream.read_string(f, entry['FileNameSize']) + entry['FileSize'] = self.struct_stream.read_struct(f, 'I') + entry['SHA256'] = f.read(0x20) + entry['SegmentCount'] = self.struct_stream.read_struct(f, 'I') + entry['SegmentsInfo'] = [] + for _ in range(entry['SegmentCount']): + segment = {} + segment['Offset'] = self.struct_stream.read_struct(f, 'Q') + segment['AlignedSize'] = self.struct_stream.read_struct(f, 'I') + segment['RealSize'] = self.struct_stream.read_struct(f, 'I') + segment['DecompressedSize'] = self.struct_stream.read_struct(f, 'I') + entry['SegmentsInfo'].append(segment) + entries.append(entry) + return entries + + def get_entry_table(self, package_file): + package_file.seek(0x1C) + table_size = self.struct_stream.read_struct(package_file, 'I') + package_file.seek(0x20) + encrypted_table = package_file.read(table_size) + return self.decrypt(encrypted_table) + + def build_entries(self, entries): + with io.BytesIO() as f: + for entry in entries: + self.struct_stream.write_struct(f, 'B', entry['SegmentationMode']) + self.struct_stream.write_struct(f, 'H', len(entry['FilePath'])) + self.struct_stream.write_string(f, entry['FilePath']) + self.struct_stream.write_struct(f, 'I', entry['FileSize']) + f.write(entry['SHA256']) + self.struct_stream.write_struct(f, 'I', len(entry['SegmentsInfo'])) + for segment in entry['SegmentsInfo']: + self.struct_stream.write_struct(f, 'Q', segment['Offset']) + self.struct_stream.write_struct(f, 'I', segment['AlignedSize']) + self.struct_stream.write_struct(f, 'I', segment['RealSize']) + self.struct_stream.write_struct(f, 'I', segment['DecompressedSize']) + return f.getvalue() + + def create_initial_entries(self, relative_files, file_paths): + print("Loading Files...") + entries = [] + for i, file_path in enumerate(file_paths): + with open(file_path, 'rb') as f: + data = f.read() + entry = {} + entry['FilePath'] = relative_files[i] + entry['FileSize'] = len(data) + entry['SHA256'] = hashlib.sha256(data).digest() + remaining = entry['FileSize'] + if self.enable_segmentation or remaining > 2 ** 32 - 1 or self.force_segmentation: + if not self.enable_segmentation: + print( + f"Big File Detected: '{entry['FilePath']}', Enforcing Segmentation", + file=sys.stderr, + ) + entry['SegmentsInfo'] = [] + entry['SegmentationMode'] = 0 if len(entry['SegmentsInfo']) > 1 else 1 + if self.force_segmentation: + entry['SegmentationMode'] = 0 + while remaining > 0: + max_bytes = min(remaining, self.max_section_size) + segment = {} + segment['Offset'] = 0 + segment['DecompressedSize'] = max_bytes + segment['RealSize'] = max_bytes + segment['AlignedSize'] = (max_bytes + 0xF) & (~0xF) + entry['SegmentsInfo'].append(segment) + remaining -= max_bytes + else: + entry['SegmentationMode'] = 1 + segment = {} + segment['Offset'] = 0 + segment['DecompressedSize'] = remaining + segment['RealSize'] = remaining + segment['AlignedSize'] = (remaining + 0xF) & (~0xF) + entry['SegmentsInfo'] = [segment] + entries.append(entry) + return entries + + def encrypt(self, data): + cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + return cipher.encrypt(pad(data, AES.block_size)) + + def decrypt(self, data): + cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + return unpad(cipher.decrypt(data), AES.block_size) + + def compress(self, data): + if self.npk_version == 3: + return zstandard.compress(data) + elif self.npk_version == 2: + import zlib + return zlib.compress(data, level=9) + else: + raise ValueError("Unsupported NPK version: {}".format(self.npk_version)) + + def decompress(self, data): + if self.npk_version == 3: + return zstandard.decompress(data) + elif self.npk_version == 2: + import zlib + try: + # import gzip + # return gzip.decompress(data) + return zlib.decompress(data, -zlib.MAX_WBITS) + except zlib.error as e: + print(f"Zlib error: {e}", file=sys.stderr) + else: + raise ValueError("Unsupported NPK version: {}".format(self.npk_version)) + + +def main(): + if len(sys.argv) < 3: + print(f"Usage: {sys.argv[0]} [options] ") + print("Options:") + print(" -u Unpack NPK archive") + print(" -r Repack directory to NPK archive") + print(" -key Set encryption key (hexadecimal)") + print(" -iv Set initialization vector (hexadecimal)") + sys.exit(1) + + input_path = sys.argv[-2] + output_path = sys.argv[-1] + + key = None + iv = None + + for i in range(1, len(sys.argv) - 2): + print(sys.argv[i]) + if sys.argv[i] == '-key': + key = bytes.fromhex(sys.argv[i + 1]) + elif sys.argv[i] == '-iv': + iv = bytes.fromhex(sys.argv[i + 1]) + + if key is None: + print("Error: Encryption key is required", file=sys.stderr) + sys.exit(1) + + tool = NPK3Tool(key, iv) + + if sys.argv[1] == '-u': + tool.unpack(input_path, output_path) + elif sys.argv[1] == '-r': + tool.repack(input_path, output_path) + else: + print(f"Error: Invalid operation: {sys.argv[1]}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cf0ea9a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pycryptodome +zstandard