forked from LAION-AI/Open-Assistant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxor_codec.py
100 lines (89 loc) · 3.5 KB
/
xor_codec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import gzip
import os
import shutil
import sys
from pathlib import Path
import numpy
def xor_uncompressed(dst, src_payload, src_base, block_size=4096):
fp_payload = open(src_payload, "rb")
fp_base = open(src_base, "rb")
with open(dst, "wb") as fp:
while True:
buf1 = numpy.array(bytearray(fp_payload.read(block_size)), dtype=numpy.uint8)
buf2 = numpy.array(bytearray(fp_base.read(block_size)), dtype=numpy.uint8)
padding = len(buf1) - len(buf2)
if padding > 0:
buf2 = numpy.pad(buf2, (0, padding), "constant", constant_values=(0,))
if padding < 0:
buf2 = buf2[: len(buf1)]
buf = numpy.bitwise_xor(buf1, buf2)
fp.write(buf)
if len(buf1) < block_size:
break
fp_payload.close()
fp_base.close()
def xor_encode(dst, src_payload, src_base, block_size=4096):
fp_payload = open(src_payload, "rb")
fp_base = open(src_base, "rb")
with gzip.open(dst, "wb") as fp:
while True:
buf1 = numpy.array(bytearray(fp_payload.read(block_size)), dtype=numpy.uint8)
buf2 = numpy.array(bytearray(fp_base.read(block_size)), dtype=numpy.uint8)
padding = len(buf1) - len(buf2)
if padding > 0:
buf2 = numpy.pad(buf2, (0, padding), "constant", constant_values=(0,))
if padding < 0:
buf2 = buf2[: len(buf1)]
buf = numpy.bitwise_xor(buf1, buf2)
fp.write(buf)
if len(buf1) < block_size:
break
fp_payload.close()
fp_base.close()
def xor_decode(dst, src_payload, src_base, block_size=4096):
fp_payload = gzip.open(src_payload, "rb")
fp_base = open(src_base, "rb")
with open(dst, "wb") as fp:
while True:
buf1 = numpy.array(bytearray(fp_payload.read(block_size)), dtype=numpy.uint8)
buf2 = numpy.array(bytearray(fp_base.read(block_size)), dtype=numpy.uint8)
padding = len(buf1) - len(buf2)
if padding > 0:
buf2 = numpy.pad(buf2, (0, padding), "constant", constant_values=(0,))
if padding < 0:
buf2 = buf2[: len(buf1)]
buf = numpy.bitwise_xor(buf1, buf2)
fp.write(buf)
if len(buf1) < block_size:
break
fp_payload.close()
fp_base.close()
def xor_dir(dst, src_payload, src_base, decode=True, compress=True):
if compress:
xor = xor_decode if decode else xor_encode
else:
xor = xor_uncompressed
Path(dst).mkdir(parents=True, exist_ok=True)
shutil.copy(Path(src_payload) / "added_tokens.json", Path(dst) / "added_tokens.json")
for path in os.listdir(src_payload):
print("[*] Processing '%s'" % path)
try:
xor("%s/%s" % (dst, path), "%s/%s" % (src_payload, path), "%s/%s" % (src_base, path))
except Exception:
print("Exception when processing '%s'" % path)
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Usage: xor.py <DESTINATION> <PAYLOAD SOURCE> <LLAMA SOURCE> [--encode] [--compress]")
exit()
dst = sys.argv[1]
src_payload = sys.argv[2]
src_base = sys.argv[3]
decode = True
compress = False
if len(sys.argv) > 4:
for arg in sys.argv[4:]:
if arg == "--encode":
decode = False
if arg == "--compress":
compress = True
xor_dir(dst, src_payload, src_base, decode=decode, compress=compress)