0x00 总述
在分组加密中,明文与密文分组长度是固定的,那么对于较大数据的加密肯定要拆分为多组明文来进行加密运算。在这个过程中,不同分组之间是否需要进行交叉处理、是否需要额外的运算,这些不同的方法就构成了不同的分组加密模式。那么对于不同的加密模式,对应了不同的密码学特性,其也有不同的攻击手段。本文中,我们将对其中的三种进行分析,并整理基本的攻击手段。
0x01 分组加密模式分类
在说攻击手段之前,我们先看一下几种常见的分组加密模式:
电子密码本(ECB)
这是最简单的一种方案,把输入进行分组,每个分组单独进行加密,加密后将密文分组按顺序拼接,就得到了原始数据的密文。解密时只需要将密文分组逐个解密,得到的明文按照密文顺序拼接即得到了明文。
密码块链接(CBC)
每个明文块先与前一个密文块进行异或后,再进行加密。在这种方法中,每个密文块都依赖于它前面的所有明文块。同时,为了保证每条消息的唯一性,在第一个块中需要使用初始化向量。
密文反馈(CFB)
将上一轮密文加密,加密值和明文进行异或得到密文。CFB模式下的分组可以是8字节、1字节甚至1比特。
这种加密模式很有特色,我们重点强调了其分组大小。对于上面提到的ECB、CBC模式,其分组大小取决于分组加密算法所能接受的分组大小,一般是受限于加密算法的,但是CFB模式不同,其可以接受几乎任意小于等于加密算法分组长度的分组大小,例如AES算法接受128bit的分组大小,那使用CFB模式时,对明文进行加密就可以不需要再进行16字节的分组、填充了,而是可以直接规定分组大小为1bit,像流密码那样进行加解密。
输出反馈(OFB)
这个模式与CFB模式很像,唯一的区别就是寄存器中更新的数据,一个是异或前的密文,一个是异或后的密文。
计数器模式(CTR)
这个模式会设定一个计数器,每次加密后都会自增,将计数器的值进行加密后和明文进行异或得到密文。
0x02 攻击方法
对于这四种分组方式,我整理了四种对应的攻击方法。
ECB:重放攻击
ECB模式相对于其他模式的特点就是,ECB不同分组之间互不影响,相同的消息在同一个密钥加密下得到的密文永远都是相同的。对于其他的分组方式,不同的初始向量、不同的加密时机都会影响加密结果。例如CBC模式:
借助这样一个特性,ECB模式可以实现重放攻击。比如现有一个加密后的账本,记录了转账情况:
- Alice pay 10$ to Bob
- Bob pay 10$ to Cathy
- ...
如果每一条数据都是单独用ECB模式加密的,那么如果现在Alice需要再支付10$给Bob,她就可以直接复制第一行的密文粘贴在下方。虽然Alice不知道密钥,但是成功伪造了这样一条支付凭证。
下面的例子可以解释这种攻击:
# -*- coding: utf-8 -*-
from Crypto.Cipher import AES
from os import urandom
import json
from Crypto.Util.number import long_to_bytes, bytes_to_long
public_notebook = []
key = urandom(16)
aes = AES.new(key, AES.MODE_ECB)
def pad(msg):
padding_length = 16 - len(msg) % 16
return msg + bytes(padding_length * [padding_length])
def unpad(msg):
return msg[:-msg[-1]]
def add_note(payer, payee):
record = {
"payer": payer,
"payee": payee,
}
encrypted_record = aes.encrypt(pad(json.dumps(record).encode()))
public_notebook.append(encrypted_record)
return encrypted_record
menu = '''Hi Alice!
Manage to get 5 candy from Bob and you will get flag.
1. Buy a candy
2. Sell a candy
3. Get flag
4. Modify the notebook'''
def main():
candies = 0
money = 100
while True:
print(menu)
choice = int(input(prompt='> '))
if choice == 1:
if money >= 100:
money -= 100
candies += 1
res = add_note("Alice", "Bob")
print("success!", bytes_to_long(res))
else:
print("Money not enough")
elif choice == 2:
if candies > 0:
money += 100
candies -= 1
res = add_note("Alice", "Bob")
print("success!", bytes_to_long(res))
else:
print("Candy not enough")
elif choice == 3:
cnt = 0
for each in public_notebook:
try:
record = json.loads(unpad(aes.decrypt(each)).decode())
except:
print("Error")
return -1
if record["payer"] == "Alice":
cnt += 1
elif record["payee"] == "Alice":
cnt -= 1
if cnt >= 5:
print("Congratulations, you got the flag!")
return 0
else:
print("Candy not enough.")
elif choice == 4:
addition = long_to_bytes(int(input(prompt="> ")))
public_notebook.append(addition)
else:
print("Invalid input.")
if __name__ == "__main__":
main()
这里,可以看到加密消息使用的ECB模式,并且存在重放攻击的可能,那么就能够在获取一次sell密文之后不断地往notebook里添加同样的记录,以达到重放攻击的目的。
得到密文数据后,重复发送4次,就完成了攻击。
CFB:选择明文伪造
在这里还是要先对CFB模式的加密解密方法进行详细的说明。其使用的原理便是内部采用一个移位寄存器,其有一个初始值,每一轮操作如下:
- 使用分组加密算法加密寄存器,得到密文
- 根据CFB的分组长度,从上面得到的密文高位选择对应长度,与明文分组进行异或,异或的结果即为这个分组对应的密文,并把这个结果更新到寄存器高位中。这样,异或时取出一个分组大小,加密后插入一个分组大小,这个寄存器队列仍然保持着原来的大小,可以继续进行下一轮计算。
我们举一个示例来展示这个过程:
我们使用python下pycryptodome中的Crypto.Cipher.AES
来展示。假设密钥key = b'a' * 16, iv = b'b' * 16
,加密明文abc
,分组模式为CFB,分组大小选为1字节(在这个库中,CFB模式默认为1字节)
我们模拟一遍这个过程,分组大小为1字节,第一轮是加密iv后将最高位取出并和明文第一字节异或,作为密文第一字节:
可以看到,我们模拟这个过程得到的第一位和实际第一位是一样的。我们再来模拟第二位和第三位:
这样就解释通了CFB模式的工作原理。
选择明文伪造就是在拥有解密机(可以对输入的任意消息进行解密并获取到密文)的情况下,即使我们不知道密钥,我们也可以去构造明文所对应的密文。
首先我们要知道的一点是,密文等于明文异或寄存器的加密值的高若干位, 对于相同的寄存器状态来说,同样的明文加密得到的结果是固定的。那我们反过来考虑,对于某个状态为r
的寄存器,加密字节a
得到密文b
,那么就可推断出此时寄存器r
加密后的高字节为a xor b
。翻过来,在这个状态下,解密b
得到的就是a
,解密c
得到的就是c xor a xor b
。
现在,我们要构造状态r1
下消息m
的密文,那首先第一步就是要恢复寄存器状态r1
。根据上面的流程分析我们知道,当解密16字节密文后,初始向量中的数据就会被完全替换为刚才输入的16字节密文,这样就达到了控制寄存器的效果。
控制了寄存器之后,我们下一个字节便可以进行任意的输入,因为我们不知道当前寄存器加密后的结果是什么样的,我们便输入任意数据a
进行解密,其返回b
,说明当前位置是用a xor b
来解密的。我们若想构造此位置的密文对应明文为c
,那么这里就应该放置密文为c xor a xor b
,这样就完成了第一字节的伪造。
经过这样的修改,因为上面我们传递的是a
,实际上这个解密过程是一条链,中间错一个字节就会影响寄存器的值,从而影响后续解密的结果,因此在解密第二字节时,我们应重新去构造寄存器,恢复状态r1
后再输入c xor a xor b
,这样就保证了链条上的完整性,继续构造下一位。一位一位的修改过后,就完成了整条消息的伪造。
2021年字节跳动的线上赛中,JustDecrypt题目便考察了这样一个内容。
#!/usr/bin/env python3.9
# -*- coding: utf-8 -*-
import string
import random
import socketserver
import signal
import codecs
from os import urandom
from hashlib import sha256
from Crypto.Cipher import AES
from flag import FLAG
BANNER = rb"""
___ _ ______ _
|_ | | | | _ \ | |
| |_ _ ___| |_ | | | |___ ___ _ __ _ _ _ __ | |_
| | | | / __| __| | | | / _ \/ __| '__| | | | '_ \| __|
/\__/ / |_| \__ \ |_ | |/ / __/ (__| | | |_| | |_) | |_
\____/ \__,_|___/\__| |___/ \___|\___|_| \__, | .__/ \__|
__/ | |
|___/|_|
"""
BLOCK_SIZE = 16
class AES_CFB(object):
def __init__(self):
self.key = urandom(BLOCK_SIZE)
self.iv = urandom(16)
self.aes_encrypt = AES.new(self.key, AES.MODE_CFB, self.iv)
self.aes_decrypt = AES.new(self.key, AES.MODE_CFB, self.iv)
def encrypt(self, plain):
return self.aes_encrypt.encrypt(self.pad(plain))
def decrypt(self, cipher):
return self.unpad(self.aes_decrypt.decrypt(cipher))
@staticmethod
def pad(s):
num = BLOCK_SIZE - (len(s) % BLOCK_SIZE)
return s + bytes([num] * num)
@staticmethod
def unpad(s):
return s[:-s[-1]]
class Task(socketserver.BaseRequestHandler):
def _recvall(self):
BUFF_SIZE = 1024
data = b''
while True:
part = self.request.recv(BUFF_SIZE)
data += part
if len(part) < BUFF_SIZE:
break
return data.strip()
def send(self, msg, newline=True):
try:
if newline:
msg += b'\n'
self.request.sendall(msg)
except:
pass
def recv(self, prompt=b'> '):
self.send(prompt, newline=False)
return self._recvall()
def proof_of_work(self):
random.seed(urandom(32))
alphabet = string.ascii_letters + string.digits
proof = ''.join(random.choices(alphabet, k=32))
hash_value = sha256(proof.encode()).hexdigest()
self.send(f'sha256(XXXX+{proof[4:]}) == {hash_value}'.encode())
nonce = self.recv(prompt=b'Give me XXXX > ')
if len(nonce) != 4 or sha256(nonce + proof[4:].encode()).hexdigest() != hash_value:
return False
return True
def timeout_handler(self, signum, frame):
raise TimeoutError
def handle(self):
try:
signal.signal(signal.SIGALRM, self.timeout_handler)
signal.alarm(60)
self.send(BANNER)
if not self.proof_of_work():
self.send(b'\nWrong!')
self.request.close()
return
self.send(b"It's just a decryption system. And I heard that only the Bytedancer can get secret.")
aes = AES_CFB()
signal.alarm(300)
for i in range(52):
cipher_hex = self.recv(prompt=b'Please enter your cipher in hex > ')
if len(cipher_hex) > 2048:
self.send(b"It's too long!")
continue
try:
cipher = codecs.decode(cipher_hex, 'hex')
except:
self.send(b'Not hex data!')
continue
if len(cipher) == 0 or len(cipher) % BLOCK_SIZE != 0:
self.send(f'Cipher length must be a multiple of {BLOCK_SIZE}!'.encode())
continue
plaintext = aes.decrypt(cipher)
plaintext_hex = codecs.encode(plaintext, 'hex')
self.send(b'Your plaintext in hex: \n%s\n' % plaintext_hex)
if plaintext == b"Hello, I'm a Bytedancer. Please give me the flag!":
self.send(b'OK! Here is your flag: ')
self.send(FLAG.encode())
break
self.send(b'Bye!\n')
except TimeoutError:
self.send(b'\nTimeout!')
except Exception as err:
self.send(b'Something Wrong!')
finally:
self.request.close()
class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
HOST, PORT = '0.0.0.0', 30000
print(HOST, PORT)
server = ForkedServer((HOST, PORT), Task)
server.allow_reuse_address = True
server.serve_forever()
下面是我实现的exp:
# -*- coding: utf-8 -*-
import re
from pwn import *
import string
from Crypto.Util.number import *
from hashlib import sha256
from os import urandom
ip, port = "39.105.181.182", 30001
#context.log_level = "debug"
charset = string.ascii_letters + string.digits
def proof(known, hashcode):
for each1 in charset:
for each2 in charset:
for each3 in charset:
for each4 in charset:
this = each1 + each2 + each3 + each4 + known
if sha256(this.encode()).hexdigest() == hashcode:
return each1 + each2 + each3 + each4
def send(data):
if len(data) % 2 == 1:
data = '0' + data
sh.recvuntil("hex > ")
sh.sendline(data)
if sh.recv(1) == b'N':
raise ValueError
sh.recvuntil("plaintext in hex: \n")
data = sh.recvline(keepends = False)
if len(data) == 0:
return b'\x00'
return long_to_bytes(int(data, 16))
def tohex(s):
return hex(bytes_to_long(s))[2:]
sh = remote(ip, port)
sh.recvuntil("sha256")
data = sh.recvline().decode()
ciphers = b''
known, hashcode = re.findall(r'\(XXXX\+(.*?)\) == (.*?)\n', data)[0]
sh.recvuntil("> ")
log.info("known: %s", known)
log.info("hashcode: %s", hashcode)
ans = proof(known, hashcode)
sh.sendline(ans)
base = urandom(16)
pad = urandom(256)
target = b"Hello, I'm a Bytedancer. Please give me the flag!"
for i in range(49):
this = base + ciphers + long_to_bytes(target[i])
num = 16 - (len(this) % 16)
this += num * b'\x00'
res = send(tohex(this + pad))
# log.info(str(res))
ciphers += long_to_bytes(res[16 + i])
r = urandom(14)
res = send(tohex(base + ciphers + r + b'\x0f' + pad))
# log.info(str(res))
ciphers = ciphers + r + long_to_bytes(res[79])
send(tohex(base))
res = send(tohex(ciphers))
print(res)
sh.interactive()
CBC:字节反转攻击
字节反转攻击达到的效果是,可以实现对某一个明文分组的消息控制,其代价就是会破坏前一个分组的消息。
由于CBC模式的这种特性,前一个分组的密文直接影响下一分组明文,因此修改前一个分组的密文就等于修改了下一分组明文,并且不影响后续的解密。说是字节反转攻击,实际上不是针对单一字节的攻击,而是可以对一整个数据块进行操作。效果如下:
因此,字节反转攻击算是一种危害较大并且利用简单的一种攻击方式。
用下面的例子可以展示这种攻击方法:
# -*- coding: utf-8 -*-
from Crypto.Cipher import AES
import os
from Crypto.Util.number import long_to_bytes, bytes_to_long
target = b'DuanYuFi pay 99$ from LordRiot. '
key = os.urandom(16)
iv = os.urandom(16)
encrypter = AES.new(key, AES.MODE_CBC, iv=iv)
decrypter = AES.new(key, AES.MODE_CBC, iv=iv)
msg = b"Block identify: " + os.urandom(16) + b"DuanYuFi got 10$ from LordRiot. "
cipher = encrypter.encrypt(msg)
print(bytes_to_long(cipher))
new_msg = long_to_bytes(int(input()))
new_msg = decrypter.decrypt(new_msg)
if new_msg[32:] == target:
print("Correct.")
else:
print("Error")
只需要经过精心构造,即可完成攻击:
0x03 总结
本文是我在这里写的第一篇文章,介绍了分组密码中三种分组加密模式对应的有效攻击手段。后续我还会介绍密码分析学、密码编码学中其他部分的相关知识分享,希望同大家一起交流进步。