import hashlib import io import os import struct import sys import zstandard from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad class StructStream: def __init__(self, big_endian=False, encoding=None): self.big_endian = big_endian self.encoding = encoding or 'utf-8' def read_struct(self, stream, struct_fmt): fmt = struct_fmt if self.big_endian: fmt = '>' + fmt size = struct.calcsize(fmt) data = stream.read(size) return struct.unpack(fmt, data)[0] def write_struct(self, stream, struct_fmt, value): fmt = struct_fmt if self.big_endian: fmt = '>' + fmt data = struct.pack(fmt, value) stream.write(data) def read_string(self, stream, length=None, encoding=None): encoding = encoding or self.encoding if length is None: data = bytearray() while True: byte = stream.read(1) if byte == b'\x00': break data += byte return data.decode(encoding) else: data = stream.read(length) return data.decode(encoding, errors='ignore').rstrip('\x00') def write_string(self, stream, value, encoding=None): encoding = encoding or self.encoding data = value.encode(encoding) stream.write(data) class NPK3Tool: def __init__(self, key, iv=None): self.key = key self.iv = iv or b'\x42\x79\x20\x4D\x61\x72\x63\x75\x73\x73\x61\x63\x61\x6e\x61\x00' self.enable_compression = True self.enable_segmentation = True self.force_segmentation = False self.npk_version = 3 self.npk_minor_version = 1 self.max_section_size = 0x10000 self.encoding = 'utf-8' self.dont_compress = [".png", ".ogg", ".jpg", ".mpg"] self.struct_stream = StructStream(encoding=self.encoding) def unpack(self, package_path, output_dir=None): if output_dir is None: output_dir = os.path.splitext(package_path)[0] + "~" os.makedirs(output_dir, exist_ok=True) with open(package_path, 'rb') as f: magic = f.read(4) print(magic) if magic == b'NPK3': self.npk_version = 3 elif magic == b'NPK2': self.npk_version = 2 else: raise ValueError("Unsupported NPK version: {}".format(magic)) f.seek(8) self.iv = f.read(16) print(self.iv) entry_table = self.get_entry_table(f) entries = self.get_entries(entry_table) for entry in entries: print(f"Extracting File: {entry['FilePath']}") output_path = os.path.join(output_dir, entry['FilePath'].replace('/', os.path.sep)) os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'wb') as outfile: for segment in entry['SegmentsInfo']: offset = segment['Offset'] aligned_size = segment['AlignedSize'] f.seek(offset) data = f.read(aligned_size) decrypted_data = self.decrypt(data) if segment['RealSize'] < segment['DecompressedSize']: decompressed_data = self.decompress(decrypted_data) outfile.write(decompressed_data) else: outfile.write(decrypted_data) def repack(self, input_dir, output_path=None): if output_path is None: output_path = os.path.splitext(input_dir)[0] + "_New.npk" input_dir = os.path.abspath(input_dir) file_paths = [] relative_files = [] for root, _, files in os.walk(input_dir): for file in files: file_path = os.path.join(root, file) file_paths.append(file_path) relative_files.append(os.path.relpath(file_path, input_dir).replace(os.path.sep, '/')) entries = self.create_initial_entries(relative_files, file_paths) with open(output_path, 'wb') as f: f.write(b'NPK3' if self.npk_version == 3 else b'NPK2') f.write(struct.pack('I', self.npk_minor_version)) f.write(self.iv) f.write(struct.pack('I', len(file_paths))) table_data = self.build_entries(entries) encrypted_table = self.encrypt(table_data) f.write(struct.pack('I', len(encrypted_table))) table_pos = f.tell() f.seek(table_pos + len(encrypted_table)) for i, entry in enumerate(entries): print(f"Writing File: {entry['FilePath']}") file_path = file_paths[i] ext = os.path.splitext(file_path).lower() compress = self.enable_compression and ext not in self.dont_compress read_pos = 0 for x, segment in enumerate(entry['SegmentsInfo']): with open(file_path, 'rb') as infile: infile.seek(read_pos) segment_data = infile.read(segment['DecompressedSize']) read_pos += len(segment_data) if compress: compressed_data = self.compress(segment_data) if len(compressed_data) < len(segment_data): segment_data = compressed_data encrypted_data = self.encrypt(segment_data) entry['SegmentsInfo'][x]['RealSize'] = len(segment_data) entry['SegmentsInfo'][x]['AlignedSize'] = len(encrypted_data) entry['SegmentsInfo'][x]['Offset'] = f.tell() f.write(encrypted_data) f.seek(table_pos) print("Writing File Index...") f.write(encrypted_table) def get_entries(self, entry_table): entries = [] with io.BytesIO(entry_table) as f: while f.tell() < len(entry_table): entry = {} entry['SegmentationMode'] = self.struct_stream.read_struct(f, 'B') entry['FileNameSize'] = self.struct_stream.read_struct(f, 'H') entry['FilePath'] = self.struct_stream.read_string(f, entry['FileNameSize']) entry['FileSize'] = self.struct_stream.read_struct(f, 'I') entry['SHA256'] = f.read(0x20) entry['SegmentCount'] = self.struct_stream.read_struct(f, 'I') entry['SegmentsInfo'] = [] for _ in range(entry['SegmentCount']): segment = {} segment['Offset'] = self.struct_stream.read_struct(f, 'Q') segment['AlignedSize'] = self.struct_stream.read_struct(f, 'I') segment['RealSize'] = self.struct_stream.read_struct(f, 'I') segment['DecompressedSize'] = self.struct_stream.read_struct(f, 'I') entry['SegmentsInfo'].append(segment) entries.append(entry) return entries def get_entry_table(self, package_file): package_file.seek(0x1C) table_size = self.struct_stream.read_struct(package_file, 'I') package_file.seek(0x20) encrypted_table = package_file.read(table_size) return self.decrypt(encrypted_table) def build_entries(self, entries): with io.BytesIO() as f: for entry in entries: self.struct_stream.write_struct(f, 'B', entry['SegmentationMode']) self.struct_stream.write_struct(f, 'H', len(entry['FilePath'])) self.struct_stream.write_string(f, entry['FilePath']) self.struct_stream.write_struct(f, 'I', entry['FileSize']) f.write(entry['SHA256']) self.struct_stream.write_struct(f, 'I', len(entry['SegmentsInfo'])) for segment in entry['SegmentsInfo']: self.struct_stream.write_struct(f, 'Q', segment['Offset']) self.struct_stream.write_struct(f, 'I', segment['AlignedSize']) self.struct_stream.write_struct(f, 'I', segment['RealSize']) self.struct_stream.write_struct(f, 'I', segment['DecompressedSize']) return f.getvalue() def create_initial_entries(self, relative_files, file_paths): print("Loading Files...") entries = [] for i, file_path in enumerate(file_paths): with open(file_path, 'rb') as f: data = f.read() entry = {} entry['FilePath'] = relative_files[i] entry['FileSize'] = len(data) entry['SHA256'] = hashlib.sha256(data).digest() remaining = entry['FileSize'] if self.enable_segmentation or remaining > 2 ** 32 - 1 or self.force_segmentation: if not self.enable_segmentation: print( f"Big File Detected: '{entry['FilePath']}', Enforcing Segmentation", file=sys.stderr, ) entry['SegmentsInfo'] = [] entry['SegmentationMode'] = 0 if len(entry['SegmentsInfo']) > 1 else 1 if self.force_segmentation: entry['SegmentationMode'] = 0 while remaining > 0: max_bytes = min(remaining, self.max_section_size) segment = {} segment['Offset'] = 0 segment['DecompressedSize'] = max_bytes segment['RealSize'] = max_bytes segment['AlignedSize'] = (max_bytes + 0xF) & (~0xF) entry['SegmentsInfo'].append(segment) remaining -= max_bytes else: entry['SegmentationMode'] = 1 segment = {} segment['Offset'] = 0 segment['DecompressedSize'] = remaining segment['RealSize'] = remaining segment['AlignedSize'] = (remaining + 0xF) & (~0xF) entry['SegmentsInfo'] = [segment] entries.append(entry) return entries def encrypt(self, data): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) return cipher.encrypt(pad(data, AES.block_size)) def decrypt(self, data): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) return unpad(cipher.decrypt(data), AES.block_size) def compress(self, data): if self.npk_version == 3: return zstandard.compress(data) elif self.npk_version == 2: import zlib return zlib.compress(data, level=9) else: raise ValueError("Unsupported NPK version: {}".format(self.npk_version)) def decompress(self, data): if self.npk_version == 3: return zstandard.decompress(data) elif self.npk_version == 2: import zlib try: # import gzip # return gzip.decompress(data) return zlib.decompress(data, -zlib.MAX_WBITS) except zlib.error as e: print(f"Zlib error: {e}", file=sys.stderr) else: raise ValueError("Unsupported NPK version: {}".format(self.npk_version)) def main(): if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} [options] ") print("Options:") print(" -u Unpack NPK archive") print(" -r Repack directory to NPK archive") print(" -key Set encryption key (hexadecimal)") print(" -iv Set initialization vector (hexadecimal)") sys.exit(1) input_path = sys.argv[-2] output_path = sys.argv[-1] key = None iv = None for i in range(1, len(sys.argv) - 2): print(sys.argv[i]) if sys.argv[i] == '-key': key = bytes.fromhex(sys.argv[i + 1]) elif sys.argv[i] == '-iv': iv = bytes.fromhex(sys.argv[i + 1]) if key is None: print("Error: Encryption key is required", file=sys.stderr) sys.exit(1) tool = NPK3Tool(key, iv) if sys.argv[1] == '-u': tool.unpack(input_path, output_path) elif sys.argv[1] == '-r': tool.repack(input_path, output_path) else: print(f"Error: Invalid operation: {sys.argv[1]}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()