Add tests for AES256GCM
This commit is contained in:
parent
384eba6cd4
commit
a7b7fb560a
@ -355,23 +355,22 @@ typedef struct {
|
|||||||
PyObject_HEAD
|
PyObject_HEAD
|
||||||
|
|
||||||
EVP_CIPHER_CTX *ctx;
|
EVP_CIPHER_CTX *ctx;
|
||||||
unsigned char iv[12], tag[16];
|
|
||||||
int state;
|
int state;
|
||||||
} AES256GCMDecrypt;
|
} AES256GCMDecrypt;
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
new_aes256gcmdecrypt(PyTypeObject *type, PyObject *args, PyObject *kwds UNUSED) {
|
new_aes256gcmdecrypt(PyTypeObject *type, PyObject *args, PyObject *kwds UNUSED) {
|
||||||
Secret *key; const unsigned char *iv, *tag; Py_ssize_t iv_len, tag_len;
|
Secret *key; unsigned char *iv, *tag; Py_ssize_t iv_len, tag_len;
|
||||||
if (!PyArg_ParseTuple(args, "O!y#y#", &Secret_Type, &key, &iv, &iv_len, &tag, &tag_len)) return NULL;
|
if (!PyArg_ParseTuple(args, "O!y#y#", &Secret_Type, &key, &iv, &iv_len, &tag, &tag_len)) return NULL;
|
||||||
if (key->secret_len != 32) { PyErr_SetString(PyExc_ValueError, "The key for AES 256 GCM must be 32 bytes long"); return NULL; }
|
if (key->secret_len != 32) { PyErr_SetString(PyExc_ValueError, "The key for AES 256 GCM must be 32 bytes long"); return NULL; }
|
||||||
if (iv_len != sizeof ((AES256GCMDecrypt*)0)->iv) { PyErr_SetString(PyExc_ValueError, "Incorrect iv length for AES 256 GCM"); return NULL; }
|
if (iv_len != 12) { PyErr_SetString(PyExc_ValueError, "Incorrect iv length for AES 256 GCM"); return NULL; }
|
||||||
if (tag_len != sizeof ((AES256GCMDecrypt*)0)->tag) { PyErr_SetString(PyExc_ValueError, "Incorrect tag length for AES 256 GCM"); return NULL; }
|
if (tag_len != 16) { PyErr_SetString(PyExc_ValueError, "Incorrect tag length for AES 256 GCM"); return NULL; }
|
||||||
AES256GCMDecrypt *self = (AES256GCMDecrypt *)type->tp_alloc(type, 0);
|
AES256GCMDecrypt *self = (AES256GCMDecrypt *)type->tp_alloc(type, 0);
|
||||||
if (!self) return NULL;
|
if (!self) return NULL;
|
||||||
if (!(self->ctx = EVP_CIPHER_CTX_new())) { Py_CLEAR(self); return set_error_from_openssl("Failed to allocate decryption context"); }
|
if (!(self->ctx = EVP_CIPHER_CTX_new())) { Py_CLEAR(self); return set_error_from_openssl("Failed to allocate decryption context"); }
|
||||||
memcpy(self->iv, iv, iv_len); memcpy(self->tag, tag, tag_len);
|
if (1 != EVP_DecryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, iv)) {
|
||||||
if (1 != EVP_DecryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, self->iv)) {
|
|
||||||
Py_CLEAR(self); return set_error_from_openssl("Failed to initialize encryption context"); }
|
Py_CLEAR(self); return set_error_from_openssl("Failed to initialize encryption context"); }
|
||||||
|
if (!EVP_CIPHER_CTX_ctrl(self->ctx, EVP_CTRL_GCM_SET_TAG, tag_len, tag)) { Py_CLEAR(self); return set_error_from_openssl("Failed to set the tag"); }
|
||||||
return (PyObject*)self;
|
return (PyObject*)self;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -397,7 +396,7 @@ add_data_to_be_decrypted(AES256GCMDecrypt *self, PyObject *args) {
|
|||||||
const char *ciphertext; Py_ssize_t ciphertext_len;
|
const char *ciphertext; Py_ssize_t ciphertext_len;
|
||||||
int finish_decryption = 0;
|
int finish_decryption = 0;
|
||||||
if (!PyArg_ParseTuple(args, "y#|p", &ciphertext, &ciphertext_len, &finish_decryption)) return NULL;
|
if (!PyArg_ParseTuple(args, "y#|p", &ciphertext, &ciphertext_len, &finish_decryption)) return NULL;
|
||||||
PyObject *plaintext = PyBytes_FromStringAndSize(NULL, ciphertext_len + 256);
|
PyObject *plaintext = PyBytes_FromStringAndSize(NULL, ciphertext_len + 2 * EVP_CIPHER_CTX_block_size(self->ctx));
|
||||||
if (!plaintext) return NULL;
|
if (!plaintext) return NULL;
|
||||||
self->state = 1;
|
self->state = 1;
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
@ -409,12 +408,12 @@ add_data_to_be_decrypted(AES256GCMDecrypt *self, PyObject *args) {
|
|||||||
}
|
}
|
||||||
if (finish_decryption) {
|
if (finish_decryption) {
|
||||||
int len = PyBytes_GET_SIZE(plaintext) - offset;
|
int len = PyBytes_GET_SIZE(plaintext) - offset;
|
||||||
if (0 <= EVP_DecryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext) + offset, &len)) {
|
int ret = EVP_DecryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext) + offset, &len);
|
||||||
Py_CLEAR(plaintext); return set_error_from_openssl("Failed to finish decryption"); }
|
|
||||||
offset += len;
|
|
||||||
self->state = 2;
|
self->state = 2;
|
||||||
|
if (ret <= 0) { Py_CLEAR(plaintext); PyErr_SetString(Crypto_Exception, "Failed to finish decrypt"); return NULL; }
|
||||||
|
offset += len;
|
||||||
}
|
}
|
||||||
if (offset != PyBytes_GET_SIZE(ciphertext)) { _PyBytes_Resize(&plaintext, offset); if (!plaintext) return NULL; }
|
if (offset != PyBytes_GET_SIZE(plaintext)) { _PyBytes_Resize(&plaintext, offset); if (!plaintext) return NULL; }
|
||||||
return plaintext;
|
return plaintext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,13 +2,17 @@
|
|||||||
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
|
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
from . import BaseTest
|
from . import BaseTest
|
||||||
|
|
||||||
|
|
||||||
class TestCrypto(BaseTest):
|
class TestCrypto(BaseTest):
|
||||||
|
|
||||||
def test_elliptic_curve_data_exchange(self):
|
def test_elliptic_curve_data_exchange(self):
|
||||||
from kitty.fast_data_types import EllipticCurveKey
|
from kitty.fast_data_types import (
|
||||||
|
AES256GCMDecrypt, AES256GCMEncrypt, CryptoError, EllipticCurveKey
|
||||||
|
)
|
||||||
alice = EllipticCurveKey()
|
alice = EllipticCurveKey()
|
||||||
bob = EllipticCurveKey()
|
bob = EllipticCurveKey()
|
||||||
alice_secret = alice.derive_secret(bob.public)
|
alice_secret = alice.derive_secret(bob.public)
|
||||||
@ -16,3 +20,31 @@ class TestCrypto(BaseTest):
|
|||||||
self.assertEqual(len(alice_secret), 32)
|
self.assertEqual(len(alice_secret), 32)
|
||||||
self.assertEqual(len(bob_secret), 32)
|
self.assertEqual(len(bob_secret), 32)
|
||||||
self.assertEqual(alice_secret, bob_secret)
|
self.assertEqual(alice_secret, bob_secret)
|
||||||
|
|
||||||
|
auth_data = os.urandom(213)
|
||||||
|
plaintext = os.urandom(1011)
|
||||||
|
e = AES256GCMEncrypt(alice_secret)
|
||||||
|
e.add_authenticated_but_unencrypted_data(auth_data)
|
||||||
|
ciphertext = e.add_data_to_be_encrypted(plaintext, True)
|
||||||
|
|
||||||
|
d = AES256GCMDecrypt(bob_secret, e.iv, e.tag)
|
||||||
|
d.add_data_to_be_authenticated_but_not_decrypted(auth_data)
|
||||||
|
q = d.add_data_to_be_decrypted(ciphertext, True)
|
||||||
|
self.ae(q, plaintext)
|
||||||
|
|
||||||
|
def corrupt_data(data):
|
||||||
|
b = bytearray(data)
|
||||||
|
b[0] = (b[0] + 13) % 256
|
||||||
|
return bytes(b)
|
||||||
|
|
||||||
|
d = AES256GCMDecrypt(bob_secret, e.iv, corrupt_data(e.tag))
|
||||||
|
d.add_data_to_be_authenticated_but_not_decrypted(auth_data)
|
||||||
|
self.assertRaises(CryptoError, d.add_data_to_be_decrypted, ciphertext, True)
|
||||||
|
|
||||||
|
d = AES256GCMDecrypt(bob_secret, e.iv, e.tag)
|
||||||
|
d.add_data_to_be_authenticated_but_not_decrypted(corrupt_data(auth_data))
|
||||||
|
self.assertRaises(CryptoError, d.add_data_to_be_decrypted, ciphertext, True)
|
||||||
|
|
||||||
|
d = AES256GCMDecrypt(bob_secret, e.iv, e.tag)
|
||||||
|
d.add_data_to_be_authenticated_but_not_decrypted(auth_data)
|
||||||
|
self.assertRaises(CryptoError, d.add_data_to_be_decrypted, corrupt_data(ciphertext), True)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user