From 384eba6cd4f6900005f411f8e849fc5f4291d72b Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 5 Aug 2022 13:14:25 +0530 Subject: [PATCH] Wrap the OpenSSL API for AES 256 GCM encryption --- kitty/crypto.c | 199 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 197 insertions(+), 2 deletions(-) diff --git a/kitty/crypto.c b/kitty/crypto.c index c3b930602..4c6aa7d0b 100644 --- a/kitty/crypto.c +++ b/kitty/crypto.c @@ -12,11 +12,14 @@ #include #include #include +#include #include +#include #define SHA1_DIGEST_LENGTH SHA_DIGEST_LENGTH typedef enum HASH_ALGORITHM { SHA1_HASH, SHA224_HASH, SHA256_HASH, SHA384_HASH, SHA512_HASH } HASH_ALGORITHM; +static PyObject* Crypto_Exception = NULL; #define ADD_TYPE(which) \ if (PyType_Ready(&which##_Type) < 0) return false; \ @@ -30,7 +33,7 @@ set_error_from_openssl(const char *prefix) { char *buf = NULL; size_t len = BIO_get_mem_data(bio, &buf); PyObject *msg = PyUnicode_FromStringAndSize(buf, len); - if (msg) PyErr_Format(PyExc_ValueError, "%s: %U", prefix, msg); + if (msg) PyErr_Format(Crypto_Exception, "%s: %U", prefix, msg); BIO_free(bio); Py_CLEAR(msg); return NULL; @@ -245,14 +248,206 @@ PyTypeObject EllipticCurveKey_Type = { }; // }}} +// AES256GCMEncrypt {{{ +typedef struct { + PyObject_HEAD + + EVP_CIPHER_CTX *ctx; + PyObject *iv, *tag; + int state; +} AES256GCMEncrypt; + +static PyObject * +new_aes256gcmencrypt(PyTypeObject *type, PyObject *args, PyObject *kwds UNUSED) { + Secret *key; + if (!PyArg_ParseTuple(args, "O!", &Secret_Type, &key)) return NULL; + if (key->secret_len != 32) { PyErr_SetString(PyExc_ValueError, "The key for AES 256 GCM must be 32 bytes long"); return NULL; } + AES256GCMEncrypt *self = (AES256GCMEncrypt *)type->tp_alloc(type, 0); + if (!self) return NULL; + if (!(self->ctx = EVP_CIPHER_CTX_new())) { Py_CLEAR(self); return set_error_from_openssl("Failed to allocate encryption context"); } + if (!(self->iv = PyBytes_FromStringAndSize(NULL, 12))) { Py_CLEAR(self); return NULL; } + if (1 != RAND_bytes((unsigned char*)PyBytes_AS_STRING(self->iv), PyBytes_GET_SIZE(self->iv))) { Py_CLEAR(self); return NULL; } + if (!(self->tag = PyBytes_FromStringAndSize(NULL, 0))) { Py_CLEAR(self); return NULL; } + if (1 != EVP_EncryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, (const unsigned char*)PyBytes_AS_STRING(self->iv))) { + Py_CLEAR(self); return set_error_from_openssl("Failed to initialize encryption context"); } + return (PyObject*)self; +} + +static void +dealloc_aes256gcmencrypt(AES256GCMEncrypt *self) { + Py_CLEAR(self->iv); Py_CLEAR(self->tag); + if (self->ctx) EVP_CIPHER_CTX_free(self->ctx); + Py_TYPE(self)->tp_free((PyObject*)self); +} + +static PyObject* +add_authenticated_but_unencrypted_data(AES256GCMEncrypt *self, PyObject *args) { + if (self->state > 0) { PyErr_SetString(Crypto_Exception, "Cannot add data once encryption has started"); return NULL; } + const char *aad; Py_ssize_t aad_len; + if (!PyArg_ParseTuple(args, "y#", &aad, &aad_len)) return NULL; + int len; + if (aad_len > 0 && 1 != EVP_EncryptUpdate(self->ctx, NULL, &len, (const unsigned char*)aad, aad_len)) return set_error_from_openssl("Failed to add AAD data"); + Py_RETURN_NONE; +} + +static PyObject* +add_data_to_be_encrypted(AES256GCMEncrypt *self, PyObject *args) { + if (self->state > 1) { PyErr_SetString(Crypto_Exception, "Encryption has been finished"); return NULL; } + const char *plaintext; Py_ssize_t plaintext_len; + int finish_encryption = 0; + if (!PyArg_ParseTuple(args, "y#|p", &plaintext, &plaintext_len, &finish_encryption)) return NULL; + PyObject *ciphertext = PyBytes_FromStringAndSize(NULL, plaintext_len + 256); + if (!ciphertext) return NULL; + self->state = 1; + int offset = 0; + if (plaintext_len) { + int len = PyBytes_GET_SIZE(ciphertext); + if (1 != EVP_EncryptUpdate(self->ctx, (unsigned char*)PyBytes_AS_STRING(ciphertext), &len, (const unsigned char*)plaintext, plaintext_len) + ) { Py_CLEAR(ciphertext); return set_error_from_openssl("Failed to encrypt"); } + offset = len; + } + if (finish_encryption) { + int len = PyBytes_GET_SIZE(ciphertext) - offset; + if (1 != EVP_EncryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(ciphertext) + offset, &len)) { + Py_CLEAR(ciphertext); return set_error_from_openssl("Failed to finish encryption"); } + offset += len; + self->state = 2; + + PyObject *tag = PyBytes_FromStringAndSize(NULL, 16); + if (!tag) { Py_CLEAR(ciphertext); return NULL; } + Py_CLEAR(self->tag); self->tag = tag; + if (1 != EVP_CIPHER_CTX_ctrl(self->ctx, EVP_CTRL_GCM_GET_TAG, PyBytes_GET_SIZE(self->tag), PyBytes_AS_STRING(tag))) { + Py_CLEAR(ciphertext); return NULL; + } + } + if (offset != PyBytes_GET_SIZE(ciphertext)) { _PyBytes_Resize(&ciphertext, offset); if (!ciphertext) return NULL; } + return ciphertext; +} + +static PyMethodDef aes256gcmencrypt_methods[] = { + METHODB(add_authenticated_but_unencrypted_data, METH_VARARGS), + METHODB(add_data_to_be_encrypted, METH_VARARGS), + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + +static PyMemberDef aes256gcmencrypt_members[] = { + {"iv", T_OBJECT_EX, offsetof(AES256GCMEncrypt, iv), READONLY, "IV"}, + {"tag", T_OBJECT_EX, offsetof(AES256GCMEncrypt, tag), READONLY, "The tag for authentication"}, + {NULL} +}; + +PyTypeObject AES256GCMEncrypt_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "fast_data_types.AES256GCMEncrypt", + .tp_basicsize = sizeof(AES256GCMEncrypt), + .tp_dealloc = (destructor)dealloc_aes256gcmencrypt, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Encrypt using AES 256 GCM with authentication", + .tp_new = new_aes256gcmencrypt, + .tp_methods = aes256gcmencrypt_methods, + .tp_members = aes256gcmencrypt_members, +}; + +// }}} + +// AES256GCMDecrypt {{{ +typedef struct { + PyObject_HEAD + + EVP_CIPHER_CTX *ctx; + unsigned char iv[12], tag[16]; + int state; +} AES256GCMDecrypt; + +static PyObject * +new_aes256gcmdecrypt(PyTypeObject *type, PyObject *args, PyObject *kwds UNUSED) { + Secret *key; const unsigned char *iv, *tag; Py_ssize_t iv_len, tag_len; + if (!PyArg_ParseTuple(args, "O!y#y#", &Secret_Type, &key, &iv, &iv_len, &tag, &tag_len)) return NULL; + if (key->secret_len != 32) { PyErr_SetString(PyExc_ValueError, "The key for AES 256 GCM must be 32 bytes long"); return NULL; } + if (iv_len != sizeof ((AES256GCMDecrypt*)0)->iv) { PyErr_SetString(PyExc_ValueError, "Incorrect iv length for AES 256 GCM"); return NULL; } + if (tag_len != sizeof ((AES256GCMDecrypt*)0)->tag) { PyErr_SetString(PyExc_ValueError, "Incorrect tag length for AES 256 GCM"); return NULL; } + AES256GCMDecrypt *self = (AES256GCMDecrypt *)type->tp_alloc(type, 0); + if (!self) return NULL; + if (!(self->ctx = EVP_CIPHER_CTX_new())) { Py_CLEAR(self); return set_error_from_openssl("Failed to allocate decryption context"); } + memcpy(self->iv, iv, iv_len); memcpy(self->tag, tag, tag_len); + if (1 != EVP_DecryptInit_ex(self->ctx, EVP_aes_256_gcm(), NULL, key->secret, self->iv)) { + Py_CLEAR(self); return set_error_from_openssl("Failed to initialize encryption context"); } + return (PyObject*)self; +} + +static void +dealloc_aes256gcmdecrypt(AES256GCMDecrypt *self) { + if (self->ctx) EVP_CIPHER_CTX_free(self->ctx); + Py_TYPE(self)->tp_free((PyObject*)self); +} + +static PyObject* +add_data_to_be_authenticated_but_not_decrypted(AES256GCMDecrypt *self, PyObject *args) { + if (self->state > 0) { PyErr_SetString(Crypto_Exception, "Cannot add data once decryption has started"); return NULL; } + const char *aad; Py_ssize_t aad_len; + if (!PyArg_ParseTuple(args, "y#", &aad, &aad_len)) return NULL; + int len; + if (aad_len > 0 && 1 != EVP_DecryptUpdate(self->ctx, NULL, &len, (const unsigned char*)aad, aad_len)) return set_error_from_openssl("Failed to add AAD data"); + Py_RETURN_NONE; +} + +static PyObject* +add_data_to_be_decrypted(AES256GCMDecrypt *self, PyObject *args) { + if (self->state > 1) { PyErr_SetString(Crypto_Exception, "Decryption has been finished"); return NULL; } + const char *ciphertext; Py_ssize_t ciphertext_len; + int finish_decryption = 0; + if (!PyArg_ParseTuple(args, "y#|p", &ciphertext, &ciphertext_len, &finish_decryption)) return NULL; + PyObject *plaintext = PyBytes_FromStringAndSize(NULL, ciphertext_len + 256); + if (!plaintext) return NULL; + self->state = 1; + int offset = 0; + if (ciphertext_len) { + int len = PyBytes_GET_SIZE(plaintext); + if (1 != EVP_DecryptUpdate(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext), &len, (const unsigned char*)ciphertext, ciphertext_len) + ) { Py_CLEAR(plaintext); return set_error_from_openssl("Failed to decrypt"); } + offset = len; + } + if (finish_decryption) { + int len = PyBytes_GET_SIZE(plaintext) - offset; + if (0 <= EVP_DecryptFinal_ex(self->ctx, (unsigned char*)PyBytes_AS_STRING(plaintext) + offset, &len)) { + Py_CLEAR(plaintext); return set_error_from_openssl("Failed to finish decryption"); } + offset += len; + self->state = 2; + } + if (offset != PyBytes_GET_SIZE(ciphertext)) { _PyBytes_Resize(&plaintext, offset); if (!plaintext) return NULL; } + return plaintext; +} + +static PyMethodDef aes256gcmdecrypt_methods[] = { + METHODB(add_data_to_be_authenticated_but_not_decrypted, METH_VARARGS), + METHODB(add_data_to_be_decrypted, METH_VARARGS), + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + +PyTypeObject AES256GCMDecrypt_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "fast_data_types.AES256GCMDecrypt", + .tp_basicsize = sizeof(AES256GCMDecrypt), + .tp_dealloc = (destructor)dealloc_aes256gcmdecrypt, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Decrypt using AES 256 GCM with authentication", + .tp_new = new_aes256gcmdecrypt, + .tp_methods = aes256gcmdecrypt_methods, +}; + +// }}} + static PyMethodDef module_methods[] = { {NULL, NULL, 0, NULL} /* Sentinel */ }; bool init_crypto_library(PyObject *module) { + Crypto_Exception = PyErr_NewException("fast_data_types.CryptoError", NULL, NULL); + if (Crypto_Exception == NULL) return false; + if (PyModule_AddObject(module, "CryptoError", Crypto_Exception) != 0) return false; if (PyModule_AddFunctions(module, module_methods) != 0) return false; - ADD_TYPE(Secret); ADD_TYPE(EllipticCurveKey); + ADD_TYPE(Secret); ADD_TYPE(EllipticCurveKey); ADD_TYPE(AES256GCMEncrypt); ADD_TYPE(AES256GCMDecrypt); if (PyModule_AddIntConstant(module, "X25519", EVP_PKEY_X25519) != 0) return false; #define AI(which) if (PyModule_AddIntMacro(module, which) != 0) return false; AI(SHA1_HASH); AI(SHA224_HASH); AI(SHA256_HASH); AI(SHA384_HASH); AI(SHA512_HASH);