From a7b7fb560a843178458e62972b5aaa60e267566d Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 5 Aug 2022 13:50:03 +0530 Subject: [PATCH] Add tests for AES256GCM --- kitty/crypto.c | 21 ++++++++++----------- kitty_tests/crypto.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/kitty/crypto.c b/kitty/crypto.c index 4c6aa7d0b..cb902d1ea 100644 --- a/kitty/crypto.c +++ b/kitty/crypto.c @@ -355,23 +355,22 @@ typedef struct { PyObject_HEAD EVP_CIPHER_CTX *ctx; - unsigned char iv[12], tag[16]; int state; } AES256GCMDecrypt; static PyObject * new_aes256gcmdecrypt(PyTypeObject *type, PyObject *args, PyObject *kwds UNUSED) { - Secret *key; const unsigned char *iv, *tag; Py_ssize_t iv_len, tag_len; + Secret *key; unsigned char *iv, *tag; Py_ssize_t iv_len, tag_len; if (!PyArg_ParseTuple(args, "O!y#y#", &Secret_Type, &key, &iv, &iv_len, &tag, &tag_len)) return NULL; if (key->secret_len != 32) { PyErr_SetString(PyExc_ValueError, "The key for AES 256 GCM must be 32 bytes long"); return NULL; } - if (iv_len != sizeof ((AES256GCMDecrypt*)0)->iv) { PyErr_SetString(PyExc_ValueError, "Incorrect iv length for AES 256 GCM"); return NULL; } - if (tag_len != sizeof ((AES256GCMDecrypt*)0)->tag) { PyErr_SetString(PyExc_ValueError, "Incorrect tag length for AES 256 GCM"); return NULL; } + if (iv_len != 12) { PyErr_SetString(PyExc_ValueError, "Incorrect iv length for AES 256 GCM"); return NULL; } + if (tag_len != 16) { PyErr_SetString(PyExc_ValueError, "Incorrect tag length for AES 256 GCM"); return NULL; } AES256GCMDecrypt *self = (AES256GCMDecrypt *)type->tp_alloc(type, 0); if (!self) return NULL; if (!(self->ctx = EVP_CIPHER_CTX_new())) { Py_CLEAR(self); return set_error_from_openssl("Failed to allocate decryption context"); } - memcpy(self->iv, iv, iv_len); memcpy(self->tag, tag, tag_len); - if (1 != EVP_DecryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, self->iv)) { + if (1 != EVP_DecryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, iv)) { Py_CLEAR(self); return set_error_from_openssl("Failed to initialize encryption context"); } + if (!EVP_CIPHER_CTX_ctrl(self->ctx, EVP_CTRL_GCM_SET_TAG, tag_len, tag)) { Py_CLEAR(self); return set_error_from_openssl("Failed to set the tag"); } return (PyObject*)self; } @@ -397,7 +396,7 @@ add_data_to_be_decrypted(AES256GCMDecrypt *self, PyObject *args) { const char *ciphertext; Py_ssize_t ciphertext_len; int finish_decryption = 0; if (!PyArg_ParseTuple(args, "y#|p", &ciphertext, &ciphertext_len, &finish_decryption)) return NULL; - PyObject *plaintext = PyBytes_FromStringAndSize(NULL, ciphertext_len + 256); + PyObject *plaintext = PyBytes_FromStringAndSize(NULL, ciphertext_len + 2 * EVP_CIPHER_CTX_block_size(self->ctx)); if (!plaintext) return NULL; self->state = 1; int offset = 0; @@ -409,12 +408,12 @@ add_data_to_be_decrypted(AES256GCMDecrypt *self, PyObject *args) { } if (finish_decryption) { int len = PyBytes_GET_SIZE(plaintext) - offset; - if (0 <= EVP_DecryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext) + offset, &len)) { - Py_CLEAR(plaintext); return set_error_from_openssl("Failed to finish decryption"); } - offset += len; + int ret = EVP_DecryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext) + offset, &len); self->state = 2; + if (ret <= 0) { Py_CLEAR(plaintext); PyErr_SetString(Crypto_Exception, "Failed to finish decrypt"); return NULL; } + offset += len; } - if (offset != PyBytes_GET_SIZE(ciphertext)) { _PyBytes_Resize(&plaintext, offset); if (!plaintext) return NULL; } + if (offset != PyBytes_GET_SIZE(plaintext)) { _PyBytes_Resize(&plaintext, offset); if (!plaintext) return NULL; } return plaintext; } diff --git a/kitty_tests/crypto.py b/kitty_tests/crypto.py index 00cd918f4..bfbe2cb56 100644 --- a/kitty_tests/crypto.py +++ b/kitty_tests/crypto.py @@ -2,13 +2,17 @@ # License: GPLv3 Copyright: 2022, Kovid Goyal +import os + from . import BaseTest class TestCrypto(BaseTest): def test_elliptic_curve_data_exchange(self): - from kitty.fast_data_types import EllipticCurveKey + from kitty.fast_data_types import ( + AES256GCMDecrypt, AES256GCMEncrypt, CryptoError, EllipticCurveKey + ) alice = EllipticCurveKey() bob = EllipticCurveKey() alice_secret = alice.derive_secret(bob.public) @@ -16,3 +20,31 @@ class TestCrypto(BaseTest): self.assertEqual(len(alice_secret), 32) self.assertEqual(len(bob_secret), 32) self.assertEqual(alice_secret, bob_secret) + + auth_data = os.urandom(213) + plaintext = os.urandom(1011) + e = AES256GCMEncrypt(alice_secret) + e.add_authenticated_but_unencrypted_data(auth_data) + ciphertext = e.add_data_to_be_encrypted(plaintext, True) + + d = AES256GCMDecrypt(bob_secret, e.iv, e.tag) + d.add_data_to_be_authenticated_but_not_decrypted(auth_data) + q = d.add_data_to_be_decrypted(ciphertext, True) + self.ae(q, plaintext) + + def corrupt_data(data): + b = bytearray(data) + b[0] = (b[0] + 13) % 256 + return bytes(b) + + d = AES256GCMDecrypt(bob_secret, e.iv, corrupt_data(e.tag)) + d.add_data_to_be_authenticated_but_not_decrypted(auth_data) + self.assertRaises(CryptoError, d.add_data_to_be_decrypted, ciphertext, True) + + d = AES256GCMDecrypt(bob_secret, e.iv, e.tag) + d.add_data_to_be_authenticated_but_not_decrypted(corrupt_data(auth_data)) + self.assertRaises(CryptoError, d.add_data_to_be_decrypted, ciphertext, True) + + d = AES256GCMDecrypt(bob_secret, e.iv, e.tag) + d.add_data_to_be_authenticated_but_not_decrypted(auth_data) + self.assertRaises(CryptoError, d.add_data_to_be_decrypted, corrupt_data(ciphertext), True)