Files
NPKExtractionTool/main.py
2024-06-28 13:17:45 +03:00

315 lines
12 KiB
Python

import hashlib
import io
import os
import struct
import sys
import zstandard
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
class StructStream:
def __init__(self, big_endian=False, encoding=None):
self.big_endian = big_endian
self.encoding = encoding or 'utf-8'
def read_struct(self, stream, struct_fmt):
fmt = struct_fmt
if self.big_endian:
fmt = '>' + fmt
size = struct.calcsize(fmt)
data = stream.read(size)
return struct.unpack(fmt, data)[0]
def write_struct(self, stream, struct_fmt, value):
fmt = struct_fmt
if self.big_endian:
fmt = '>' + fmt
data = struct.pack(fmt, value)
stream.write(data)
def read_string(self, stream, length=None, encoding=None):
encoding = encoding or self.encoding
if length is None:
data = bytearray()
while True:
byte = stream.read(1)
if byte == b'\x00':
break
data += byte
return data.decode(encoding)
else:
data = stream.read(length)
return data.decode(encoding, errors='ignore').rstrip('\x00')
def write_string(self, stream, value, encoding=None):
encoding = encoding or self.encoding
data = value.encode(encoding)
stream.write(data)
class NPK3Tool:
def __init__(self, key, iv=None):
self.key = key
self.iv = iv or b'\x42\x79\x20\x4D\x61\x72\x63\x75\x73\x73\x61\x63\x61\x6e\x61\x00'
self.enable_compression = True
self.enable_segmentation = True
self.force_segmentation = False
self.npk_version = 3
self.npk_minor_version = 1
self.max_section_size = 0x10000
self.encoding = 'utf-8'
self.dont_compress = [".png", ".ogg", ".jpg", ".mpg"]
self.struct_stream = StructStream(encoding=self.encoding)
def unpack(self, package_path, output_dir=None):
if output_dir is None:
output_dir = os.path.splitext(package_path)[0] + "~"
os.makedirs(output_dir, exist_ok=True)
with open(package_path, 'rb') as f:
magic = f.read(4)
print(magic)
if magic == b'NPK3':
self.npk_version = 3
elif magic == b'NPK2':
self.npk_version = 2
else:
raise ValueError("Unsupported NPK version: {}".format(magic))
f.seek(8)
self.iv = f.read(16)
print(self.iv)
entry_table = self.get_entry_table(f)
entries = self.get_entries(entry_table)
for entry in entries:
print(f"Extracting File: {entry['FilePath']}")
output_path = os.path.join(output_dir, entry['FilePath'].replace('/', os.path.sep))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'wb') as outfile:
for segment in entry['SegmentsInfo']:
offset = segment['Offset']
aligned_size = segment['AlignedSize']
f.seek(offset)
data = f.read(aligned_size)
decrypted_data = self.decrypt(data)
if segment['RealSize'] < segment['DecompressedSize']:
decompressed_data = self.decompress(decrypted_data)
outfile.write(decompressed_data)
else:
outfile.write(decrypted_data)
def repack(self, input_dir, output_path=None):
if output_path is None:
output_path = os.path.splitext(input_dir)[0] + "_New.npk"
input_dir = os.path.abspath(input_dir)
file_paths = []
relative_files = []
for root, _, files in os.walk(input_dir):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
relative_files.append(os.path.relpath(file_path, input_dir).replace(os.path.sep, '/'))
entries = self.create_initial_entries(relative_files, file_paths)
with open(output_path, 'wb') as f:
f.write(b'NPK3' if self.npk_version == 3 else b'NPK2')
f.write(struct.pack('I', self.npk_minor_version))
f.write(self.iv)
f.write(struct.pack('I', len(file_paths)))
table_data = self.build_entries(entries)
encrypted_table = self.encrypt(table_data)
f.write(struct.pack('I', len(encrypted_table)))
table_pos = f.tell()
f.seek(table_pos + len(encrypted_table))
for i, entry in enumerate(entries):
print(f"Writing File: {entry['FilePath']}")
file_path = file_paths[i]
ext = os.path.splitext(file_path).lower()
compress = self.enable_compression and ext not in self.dont_compress
read_pos = 0
for x, segment in enumerate(entry['SegmentsInfo']):
with open(file_path, 'rb') as infile:
infile.seek(read_pos)
segment_data = infile.read(segment['DecompressedSize'])
read_pos += len(segment_data)
if compress:
compressed_data = self.compress(segment_data)
if len(compressed_data) < len(segment_data):
segment_data = compressed_data
encrypted_data = self.encrypt(segment_data)
entry['SegmentsInfo'][x]['RealSize'] = len(segment_data)
entry['SegmentsInfo'][x]['AlignedSize'] = len(encrypted_data)
entry['SegmentsInfo'][x]['Offset'] = f.tell()
f.write(encrypted_data)
f.seek(table_pos)
print("Writing File Index...")
f.write(encrypted_table)
def get_entries(self, entry_table):
entries = []
with io.BytesIO(entry_table) as f:
while f.tell() < len(entry_table):
entry = {}
entry['SegmentationMode'] = self.struct_stream.read_struct(f, 'B')
entry['FileNameSize'] = self.struct_stream.read_struct(f, 'H')
entry['FilePath'] = self.struct_stream.read_string(f, entry['FileNameSize'])
entry['FileSize'] = self.struct_stream.read_struct(f, 'I')
entry['SHA256'] = f.read(0x20)
entry['SegmentCount'] = self.struct_stream.read_struct(f, 'I')
entry['SegmentsInfo'] = []
for _ in range(entry['SegmentCount']):
segment = {}
segment['Offset'] = self.struct_stream.read_struct(f, 'Q')
segment['AlignedSize'] = self.struct_stream.read_struct(f, 'I')
segment['RealSize'] = self.struct_stream.read_struct(f, 'I')
segment['DecompressedSize'] = self.struct_stream.read_struct(f, 'I')
entry['SegmentsInfo'].append(segment)
entries.append(entry)
return entries
def get_entry_table(self, package_file):
package_file.seek(0x1C)
table_size = self.struct_stream.read_struct(package_file, 'I')
package_file.seek(0x20)
encrypted_table = package_file.read(table_size)
return self.decrypt(encrypted_table)
def build_entries(self, entries):
with io.BytesIO() as f:
for entry in entries:
self.struct_stream.write_struct(f, 'B', entry['SegmentationMode'])
self.struct_stream.write_struct(f, 'H', len(entry['FilePath']))
self.struct_stream.write_string(f, entry['FilePath'])
self.struct_stream.write_struct(f, 'I', entry['FileSize'])
f.write(entry['SHA256'])
self.struct_stream.write_struct(f, 'I', len(entry['SegmentsInfo']))
for segment in entry['SegmentsInfo']:
self.struct_stream.write_struct(f, 'Q', segment['Offset'])
self.struct_stream.write_struct(f, 'I', segment['AlignedSize'])
self.struct_stream.write_struct(f, 'I', segment['RealSize'])
self.struct_stream.write_struct(f, 'I', segment['DecompressedSize'])
return f.getvalue()
def create_initial_entries(self, relative_files, file_paths):
print("Loading Files...")
entries = []
for i, file_path in enumerate(file_paths):
with open(file_path, 'rb') as f:
data = f.read()
entry = {}
entry['FilePath'] = relative_files[i]
entry['FileSize'] = len(data)
entry['SHA256'] = hashlib.sha256(data).digest()
remaining = entry['FileSize']
if self.enable_segmentation or remaining > 2 ** 32 - 1 or self.force_segmentation:
if not self.enable_segmentation:
print(
f"Big File Detected: '{entry['FilePath']}', Enforcing Segmentation",
file=sys.stderr,
)
entry['SegmentsInfo'] = []
entry['SegmentationMode'] = 0 if len(entry['SegmentsInfo']) > 1 else 1
if self.force_segmentation:
entry['SegmentationMode'] = 0
while remaining > 0:
max_bytes = min(remaining, self.max_section_size)
segment = {}
segment['Offset'] = 0
segment['DecompressedSize'] = max_bytes
segment['RealSize'] = max_bytes
segment['AlignedSize'] = (max_bytes + 0xF) & (~0xF)
entry['SegmentsInfo'].append(segment)
remaining -= max_bytes
else:
entry['SegmentationMode'] = 1
segment = {}
segment['Offset'] = 0
segment['DecompressedSize'] = remaining
segment['RealSize'] = remaining
segment['AlignedSize'] = (remaining + 0xF) & (~0xF)
entry['SegmentsInfo'] = [segment]
entries.append(entry)
return entries
def encrypt(self, data):
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
return cipher.encrypt(pad(data, AES.block_size))
def decrypt(self, data):
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
return unpad(cipher.decrypt(data), AES.block_size)
def compress(self, data):
if self.npk_version == 3:
return zstandard.compress(data)
elif self.npk_version == 2:
import zlib
return zlib.compress(data, level=9)
else:
raise ValueError("Unsupported NPK version: {}".format(self.npk_version))
def decompress(self, data):
if self.npk_version == 3:
return zstandard.decompress(data)
elif self.npk_version == 2:
import zlib
try:
# import gzip
# return gzip.decompress(data)
return zlib.decompress(data, -zlib.MAX_WBITS)
except zlib.error as e:
print(f"Zlib error: {e}", file=sys.stderr)
else:
raise ValueError("Unsupported NPK version: {}".format(self.npk_version))
def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} [options] <input> <output>")
print("Options:")
print(" -u Unpack NPK archive")
print(" -r Repack directory to NPK archive")
print(" -key <key> Set encryption key (hexadecimal)")
print(" -iv <iv> Set initialization vector (hexadecimal)")
sys.exit(1)
input_path = sys.argv[-2]
output_path = sys.argv[-1]
key = None
iv = None
for i in range(1, len(sys.argv) - 2):
print(sys.argv[i])
if sys.argv[i] == '-key':
key = bytes.fromhex(sys.argv[i + 1])
elif sys.argv[i] == '-iv':
iv = bytes.fromhex(sys.argv[i + 1])
if key is None:
print("Error: Encryption key is required", file=sys.stderr)
sys.exit(1)
tool = NPK3Tool(key, iv)
if sys.argv[1] == '-u':
tool.unpack(input_path, output_path)
elif sys.argv[1] == '-r':
tool.repack(input_path, output_path)
else:
print(f"Error: Invalid operation: {sys.argv[1]}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()