Initial commit
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
output
|
||||||
|
*.npk
|
||||||
|
venv
|
||||||
315
main.py
Normal file
315
main.py
Normal file
@@ -0,0 +1,315 @@
|
|||||||
|
import hashlib
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import zstandard
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from Crypto.Util.Padding import pad, unpad
|
||||||
|
|
||||||
|
|
||||||
|
class StructStream:
|
||||||
|
def __init__(self, big_endian=False, encoding=None):
|
||||||
|
self.big_endian = big_endian
|
||||||
|
self.encoding = encoding or 'utf-8'
|
||||||
|
|
||||||
|
def read_struct(self, stream, struct_fmt):
|
||||||
|
fmt = struct_fmt
|
||||||
|
if self.big_endian:
|
||||||
|
fmt = '>' + fmt
|
||||||
|
size = struct.calcsize(fmt)
|
||||||
|
data = stream.read(size)
|
||||||
|
return struct.unpack(fmt, data)[0]
|
||||||
|
|
||||||
|
def write_struct(self, stream, struct_fmt, value):
|
||||||
|
fmt = struct_fmt
|
||||||
|
if self.big_endian:
|
||||||
|
fmt = '>' + fmt
|
||||||
|
data = struct.pack(fmt, value)
|
||||||
|
stream.write(data)
|
||||||
|
|
||||||
|
def read_string(self, stream, length=None, encoding=None):
|
||||||
|
encoding = encoding or self.encoding
|
||||||
|
if length is None:
|
||||||
|
data = bytearray()
|
||||||
|
while True:
|
||||||
|
byte = stream.read(1)
|
||||||
|
if byte == b'\x00':
|
||||||
|
break
|
||||||
|
data += byte
|
||||||
|
return data.decode(encoding)
|
||||||
|
else:
|
||||||
|
data = stream.read(length)
|
||||||
|
return data.decode(encoding, errors='ignore').rstrip('\x00')
|
||||||
|
|
||||||
|
def write_string(self, stream, value, encoding=None):
|
||||||
|
encoding = encoding or self.encoding
|
||||||
|
data = value.encode(encoding)
|
||||||
|
stream.write(data)
|
||||||
|
|
||||||
|
class NPK3Tool:
|
||||||
|
|
||||||
|
def __init__(self, key, iv=None):
|
||||||
|
self.key = key
|
||||||
|
self.iv = iv or b'\x42\x79\x20\x4D\x61\x72\x63\x75\x73\x73\x61\x63\x61\x6e\x61\x00'
|
||||||
|
self.enable_compression = True
|
||||||
|
self.enable_segmentation = True
|
||||||
|
self.force_segmentation = False
|
||||||
|
self.npk_version = 3
|
||||||
|
self.npk_minor_version = 1
|
||||||
|
self.max_section_size = 0x10000
|
||||||
|
self.encoding = 'utf-8'
|
||||||
|
self.dont_compress = [".png", ".ogg", ".jpg", ".mpg"]
|
||||||
|
self.struct_stream = StructStream(encoding=self.encoding)
|
||||||
|
|
||||||
|
def unpack(self, package_path, output_dir=None):
|
||||||
|
if output_dir is None:
|
||||||
|
output_dir = os.path.splitext(package_path)[0] + "~"
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
with open(package_path, 'rb') as f:
|
||||||
|
magic = f.read(4)
|
||||||
|
print(magic)
|
||||||
|
if magic == b'NPK3':
|
||||||
|
self.npk_version = 3
|
||||||
|
elif magic == b'NPK2':
|
||||||
|
self.npk_version = 2
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported NPK version: {}".format(magic))
|
||||||
|
|
||||||
|
f.seek(8)
|
||||||
|
self.iv = f.read(16)
|
||||||
|
print(self.iv)
|
||||||
|
entry_table = self.get_entry_table(f)
|
||||||
|
entries = self.get_entries(entry_table)
|
||||||
|
|
||||||
|
for entry in entries:
|
||||||
|
print(f"Extracting File: {entry['FilePath']}")
|
||||||
|
output_path = os.path.join(output_dir, entry['FilePath'].replace('/', os.path.sep))
|
||||||
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||||
|
with open(output_path, 'wb') as outfile:
|
||||||
|
for segment in entry['SegmentsInfo']:
|
||||||
|
offset = segment['Offset']
|
||||||
|
aligned_size = segment['AlignedSize']
|
||||||
|
f.seek(offset)
|
||||||
|
data = f.read(aligned_size)
|
||||||
|
decrypted_data = self.decrypt(data)
|
||||||
|
if segment['RealSize'] < segment['DecompressedSize']:
|
||||||
|
decompressed_data = self.decompress(decrypted_data)
|
||||||
|
outfile.write(decompressed_data)
|
||||||
|
else:
|
||||||
|
outfile.write(decrypted_data)
|
||||||
|
|
||||||
|
def repack(self, input_dir, output_path=None):
|
||||||
|
if output_path is None:
|
||||||
|
output_path = os.path.splitext(input_dir)[0] + "_New.npk"
|
||||||
|
input_dir = os.path.abspath(input_dir)
|
||||||
|
|
||||||
|
file_paths = []
|
||||||
|
relative_files = []
|
||||||
|
for root, _, files in os.walk(input_dir):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
file_paths.append(file_path)
|
||||||
|
relative_files.append(os.path.relpath(file_path, input_dir).replace(os.path.sep, '/'))
|
||||||
|
|
||||||
|
entries = self.create_initial_entries(relative_files, file_paths)
|
||||||
|
|
||||||
|
with open(output_path, 'wb') as f:
|
||||||
|
f.write(b'NPK3' if self.npk_version == 3 else b'NPK2')
|
||||||
|
f.write(struct.pack('I', self.npk_minor_version))
|
||||||
|
f.write(self.iv)
|
||||||
|
f.write(struct.pack('I', len(file_paths)))
|
||||||
|
|
||||||
|
table_data = self.build_entries(entries)
|
||||||
|
encrypted_table = self.encrypt(table_data)
|
||||||
|
f.write(struct.pack('I', len(encrypted_table)))
|
||||||
|
|
||||||
|
table_pos = f.tell()
|
||||||
|
f.seek(table_pos + len(encrypted_table))
|
||||||
|
|
||||||
|
for i, entry in enumerate(entries):
|
||||||
|
print(f"Writing File: {entry['FilePath']}")
|
||||||
|
file_path = file_paths[i]
|
||||||
|
ext = os.path.splitext(file_path).lower()
|
||||||
|
compress = self.enable_compression and ext not in self.dont_compress
|
||||||
|
|
||||||
|
read_pos = 0
|
||||||
|
for x, segment in enumerate(entry['SegmentsInfo']):
|
||||||
|
with open(file_path, 'rb') as infile:
|
||||||
|
infile.seek(read_pos)
|
||||||
|
segment_data = infile.read(segment['DecompressedSize'])
|
||||||
|
read_pos += len(segment_data)
|
||||||
|
if compress:
|
||||||
|
compressed_data = self.compress(segment_data)
|
||||||
|
if len(compressed_data) < len(segment_data):
|
||||||
|
segment_data = compressed_data
|
||||||
|
encrypted_data = self.encrypt(segment_data)
|
||||||
|
entry['SegmentsInfo'][x]['RealSize'] = len(segment_data)
|
||||||
|
entry['SegmentsInfo'][x]['AlignedSize'] = len(encrypted_data)
|
||||||
|
entry['SegmentsInfo'][x]['Offset'] = f.tell()
|
||||||
|
f.write(encrypted_data)
|
||||||
|
|
||||||
|
f.seek(table_pos)
|
||||||
|
print("Writing File Index...")
|
||||||
|
f.write(encrypted_table)
|
||||||
|
|
||||||
|
def get_entries(self, entry_table):
|
||||||
|
entries = []
|
||||||
|
with io.BytesIO(entry_table) as f:
|
||||||
|
while f.tell() < len(entry_table):
|
||||||
|
entry = {}
|
||||||
|
entry['SegmentationMode'] = self.struct_stream.read_struct(f, 'B')
|
||||||
|
entry['FileNameSize'] = self.struct_stream.read_struct(f, 'H')
|
||||||
|
entry['FilePath'] = self.struct_stream.read_string(f, entry['FileNameSize'])
|
||||||
|
entry['FileSize'] = self.struct_stream.read_struct(f, 'I')
|
||||||
|
entry['SHA256'] = f.read(0x20)
|
||||||
|
entry['SegmentCount'] = self.struct_stream.read_struct(f, 'I')
|
||||||
|
entry['SegmentsInfo'] = []
|
||||||
|
for _ in range(entry['SegmentCount']):
|
||||||
|
segment = {}
|
||||||
|
segment['Offset'] = self.struct_stream.read_struct(f, 'Q')
|
||||||
|
segment['AlignedSize'] = self.struct_stream.read_struct(f, 'I')
|
||||||
|
segment['RealSize'] = self.struct_stream.read_struct(f, 'I')
|
||||||
|
segment['DecompressedSize'] = self.struct_stream.read_struct(f, 'I')
|
||||||
|
entry['SegmentsInfo'].append(segment)
|
||||||
|
entries.append(entry)
|
||||||
|
return entries
|
||||||
|
|
||||||
|
def get_entry_table(self, package_file):
|
||||||
|
package_file.seek(0x1C)
|
||||||
|
table_size = self.struct_stream.read_struct(package_file, 'I')
|
||||||
|
package_file.seek(0x20)
|
||||||
|
encrypted_table = package_file.read(table_size)
|
||||||
|
return self.decrypt(encrypted_table)
|
||||||
|
|
||||||
|
def build_entries(self, entries):
|
||||||
|
with io.BytesIO() as f:
|
||||||
|
for entry in entries:
|
||||||
|
self.struct_stream.write_struct(f, 'B', entry['SegmentationMode'])
|
||||||
|
self.struct_stream.write_struct(f, 'H', len(entry['FilePath']))
|
||||||
|
self.struct_stream.write_string(f, entry['FilePath'])
|
||||||
|
self.struct_stream.write_struct(f, 'I', entry['FileSize'])
|
||||||
|
f.write(entry['SHA256'])
|
||||||
|
self.struct_stream.write_struct(f, 'I', len(entry['SegmentsInfo']))
|
||||||
|
for segment in entry['SegmentsInfo']:
|
||||||
|
self.struct_stream.write_struct(f, 'Q', segment['Offset'])
|
||||||
|
self.struct_stream.write_struct(f, 'I', segment['AlignedSize'])
|
||||||
|
self.struct_stream.write_struct(f, 'I', segment['RealSize'])
|
||||||
|
self.struct_stream.write_struct(f, 'I', segment['DecompressedSize'])
|
||||||
|
return f.getvalue()
|
||||||
|
|
||||||
|
def create_initial_entries(self, relative_files, file_paths):
|
||||||
|
print("Loading Files...")
|
||||||
|
entries = []
|
||||||
|
for i, file_path in enumerate(file_paths):
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
entry = {}
|
||||||
|
entry['FilePath'] = relative_files[i]
|
||||||
|
entry['FileSize'] = len(data)
|
||||||
|
entry['SHA256'] = hashlib.sha256(data).digest()
|
||||||
|
remaining = entry['FileSize']
|
||||||
|
if self.enable_segmentation or remaining > 2 ** 32 - 1 or self.force_segmentation:
|
||||||
|
if not self.enable_segmentation:
|
||||||
|
print(
|
||||||
|
f"Big File Detected: '{entry['FilePath']}', Enforcing Segmentation",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
entry['SegmentsInfo'] = []
|
||||||
|
entry['SegmentationMode'] = 0 if len(entry['SegmentsInfo']) > 1 else 1
|
||||||
|
if self.force_segmentation:
|
||||||
|
entry['SegmentationMode'] = 0
|
||||||
|
while remaining > 0:
|
||||||
|
max_bytes = min(remaining, self.max_section_size)
|
||||||
|
segment = {}
|
||||||
|
segment['Offset'] = 0
|
||||||
|
segment['DecompressedSize'] = max_bytes
|
||||||
|
segment['RealSize'] = max_bytes
|
||||||
|
segment['AlignedSize'] = (max_bytes + 0xF) & (~0xF)
|
||||||
|
entry['SegmentsInfo'].append(segment)
|
||||||
|
remaining -= max_bytes
|
||||||
|
else:
|
||||||
|
entry['SegmentationMode'] = 1
|
||||||
|
segment = {}
|
||||||
|
segment['Offset'] = 0
|
||||||
|
segment['DecompressedSize'] = remaining
|
||||||
|
segment['RealSize'] = remaining
|
||||||
|
segment['AlignedSize'] = (remaining + 0xF) & (~0xF)
|
||||||
|
entry['SegmentsInfo'] = [segment]
|
||||||
|
entries.append(entry)
|
||||||
|
return entries
|
||||||
|
|
||||||
|
def encrypt(self, data):
|
||||||
|
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
|
||||||
|
return cipher.encrypt(pad(data, AES.block_size))
|
||||||
|
|
||||||
|
def decrypt(self, data):
|
||||||
|
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
|
||||||
|
return unpad(cipher.decrypt(data), AES.block_size)
|
||||||
|
|
||||||
|
def compress(self, data):
|
||||||
|
if self.npk_version == 3:
|
||||||
|
return zstandard.compress(data)
|
||||||
|
elif self.npk_version == 2:
|
||||||
|
import zlib
|
||||||
|
return zlib.compress(data, level=9)
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported NPK version: {}".format(self.npk_version))
|
||||||
|
|
||||||
|
def decompress(self, data):
|
||||||
|
if self.npk_version == 3:
|
||||||
|
return zstandard.decompress(data)
|
||||||
|
elif self.npk_version == 2:
|
||||||
|
import zlib
|
||||||
|
try:
|
||||||
|
# import gzip
|
||||||
|
# return gzip.decompress(data)
|
||||||
|
return zlib.decompress(data, -zlib.MAX_WBITS)
|
||||||
|
except zlib.error as e:
|
||||||
|
print(f"Zlib error: {e}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported NPK version: {}".format(self.npk_version))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print(f"Usage: {sys.argv[0]} [options] <input> <output>")
|
||||||
|
print("Options:")
|
||||||
|
print(" -u Unpack NPK archive")
|
||||||
|
print(" -r Repack directory to NPK archive")
|
||||||
|
print(" -key <key> Set encryption key (hexadecimal)")
|
||||||
|
print(" -iv <iv> Set initialization vector (hexadecimal)")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
input_path = sys.argv[-2]
|
||||||
|
output_path = sys.argv[-1]
|
||||||
|
|
||||||
|
key = None
|
||||||
|
iv = None
|
||||||
|
|
||||||
|
for i in range(1, len(sys.argv) - 2):
|
||||||
|
print(sys.argv[i])
|
||||||
|
if sys.argv[i] == '-key':
|
||||||
|
key = bytes.fromhex(sys.argv[i + 1])
|
||||||
|
elif sys.argv[i] == '-iv':
|
||||||
|
iv = bytes.fromhex(sys.argv[i + 1])
|
||||||
|
|
||||||
|
if key is None:
|
||||||
|
print("Error: Encryption key is required", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
tool = NPK3Tool(key, iv)
|
||||||
|
|
||||||
|
if sys.argv[1] == '-u':
|
||||||
|
tool.unpack(input_path, output_path)
|
||||||
|
elif sys.argv[1] == '-r':
|
||||||
|
tool.repack(input_path, output_path)
|
||||||
|
else:
|
||||||
|
print(f"Error: Invalid operation: {sys.argv[1]}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
pycryptodome
|
||||||
|
zstandard
|
||||||
Reference in New Issue
Block a user