diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst index 1cfd165202d0ef..4ad63bed501735 100644 --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -1259,6 +1259,40 @@ SSL sockets also have the following additional methods and attributes: .. versionchanged:: 3.9 IPv6 address strings no longer have a trailing new line. +.. method:: SSLSocket.get_unverified_chain(binary_form=False) + + Returns an **unverified** certificate chain for the peer. If no chain is + provided, returns :const:`None`. Otherwise returns a tuple of dicts + containing information about the certificates. The chain starts with the + leaf certificate and ends with the root certificate. Return :const:`None` + if the session is resumed as peers do not send certificates. + + If the ``binary_form`` parameter is :const:`True`, and a chain is available, + this method returns a tuple with each element corresponding to the + DER-encoded form of the entire certificate as a sequence of bytes. + + .. versionadded:: 3.10 + + .. warning:: + This is not a verified chain. See :meth:`ssl.SSLSocket.get_verified_chain`. + +.. method:: SSLSocket.get_verified_chain(binary_form=False) + + Returns a verified certificate chain for the peer. If no chain is provided, + returns :const:`None`. Otherwise returns a tuple of dicts containing + information about the certificates. The chain starts with the leaf + certificate and ends with the root certificate. Return :const:`None` if the + session is resumed as peers do not send certificates. + + If the ``binary_form`` parameter is :const:`True`, and a chain is available, + this method returns a tuple with each element corresponding to the + DER-encoded form of the entire certificate as a sequence of bytes. + + .. versionadded:: 3.10 + + .. note:: + This features requires OpenSSL 1.1.0 or newer. + .. method:: SSLSocket.cipher() Returns a three-value tuple containing the name of the cipher being used, the diff --git a/Lib/ssl.py b/Lib/ssl.py index 30f4e5934febf9..b96f8172a2c50d 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -905,6 +905,21 @@ def getpeercert(self, binary_form=False): """ return self._sslobj.getpeercert(binary_form) + def get_unverified_chain(self, binary_form=False): + """"Returns the certificate chain of the SSL connection as a tuple of + dicts. It is *not* a verified chain. + + Return ``None`` if no chain is provided.""" + return self._sslobj.get_unverified_chain(binary_form) + + if hasattr(_ssl._SSLSocket, 'get_verified_chain'): + def get_verified_chain(self, binary_form=False): + """"Returns the verified certificate chain of the SSL connection as a + tuple of dicts. + + Return ``None`` if no chain is provided.""" + return self._sslobj.get_verified_chain(binary_form) + def selected_npn_protocol(self): """Return the currently selected NPN protocol as a string, or ``None`` if a next protocol was not negotiated or if NPN is not supported by one @@ -1123,6 +1138,19 @@ def getpeercert(self, binary_form=False): self._check_connected() return self._sslobj.getpeercert(binary_form) + @_sslcopydoc + def get_unverified_chain(self, binary_form=False): + self._checkClosed() + self._check_connected() + return self._sslobj.get_unverified_chain(binary_form) + + if hasattr(_ssl._SSLSocket, 'get_verified_chain'): + @_sslcopydoc + def get_verified_chain(self, binary_form=False): + self._checkClosed() + self._check_connected() + return self._sslobj.get_verified_chain(binary_form) + @_sslcopydoc def selected_npn_protocol(self): self._checkClosed() diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 67850c34e00c20..2fbffeb1840c6f 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2160,6 +2160,59 @@ def test_get_ca_certs_capath(self): self.assertTrue(cert) self.assertEqual(len(ctx.get_ca_certs()), 1) + def test_get_unverified_chain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + try: + peer_cert = s.getpeercert() + peer_cert_bin = s.getpeercert(True) + chain_no_validate = s.get_unverified_chain() + chain_bin_no_validate = s.get_unverified_chain(True) + finally: + self.assertTrue(peer_cert) + self.assertTrue(peer_cert_bin) + + # ca cert + ca_certs = ctx.get_ca_certs() + self.assertEqual(len(ca_certs), 1) + + self.assertEqual(chain_no_validate, (peer_cert,)) + self.assertEqual(chain_bin_no_validate, (peer_cert_bin,)) + + def test_get_verified_chain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + try: + peer_cert = s.getpeercert() + peer_cert_bin = s.getpeercert(True) + if IS_OPENSSL_1_1_0: + chain = s.get_verified_chain() + chain_bin = s.get_verified_chain(True) + else: + self.assertFalse(hasattr(s, 'get_verified_chain')) + finally: + self.assertTrue(peer_cert) + self.assertTrue(peer_cert_bin) + if IS_OPENSSL_1_1_0: + self.assertEqual(len(chain), 2) + self.assertEqual(len(chain_bin), 2) + + # ca cert + ca_certs = ctx.get_ca_certs() + self.assertEqual(len(ca_certs), 1) + test_get_ca_certsert = ca_certs[0] + ca_cert_bin = ctx.get_ca_certs(True)[0] + + if IS_OPENSSL_1_1_0: + self.assertEqual(chain, (peer_cert, test_get_ca_certsert)) + self.assertEqual(chain_bin, (peer_cert_bin, ca_cert_bin)) + @needs_sni def test_context_setget(self): # Check that the context of a connected socket can be replaced. diff --git a/Misc/NEWS.d/next/Library/2020-01-10-16-06-13.bpo-18233.EsqH1K.rst b/Misc/NEWS.d/next/Library/2020-01-10-16-06-13.bpo-18233.EsqH1K.rst new file mode 100644 index 00000000000000..25c5280aaae954 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-01-10-16-06-13.bpo-18233.EsqH1K.rst @@ -0,0 +1 @@ +Add :meth:`ssl.SSLSocket.get_unverified_chain` and :meth:`ssl.SSLSocket.get_verified_chain` for accessing the certificate chain of SSL connections. diff --git a/Modules/_ssl.c b/Modules/_ssl.c index 96d2796fcfad48..5c74984aa51cfa 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2096,6 +2096,95 @@ _ssl__SSLSocket_cipher_impl(PySSLSocket *self) return cipher_to_tuple(current); } +static PyObject * +chain_to_pyobject(STACK_OF(X509) *peer_chain, int binary_mode) +{ + int len, i; + PyObject *retval = NULL, *ci=NULL; + + if (peer_chain == NULL) { + Py_RETURN_NONE; + } + + len = sk_X509_num(peer_chain); + if ((retval = PyTuple_New(len)) == NULL) { + return NULL; + } + + for (i = 0; i < len; i++){ + X509 *cert = sk_X509_value(peer_chain, i); + if (binary_mode) { + ci = _certificate_to_der(cert); + } else { + ci = _decode_certificate(cert); + } + + if (ci == NULL) { + Py_CLEAR(retval); + break; + } + PyTuple_SET_ITEM(retval, i, ci); + } + + return retval; +} + +/*[clinic input] +_ssl._SSLSocket.get_unverified_chain + der as binary_mode: bool = False +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_get_unverified_chain_impl(PySSLSocket *self, int binary_mode) +/*[clinic end generated code: output=a84c4e7bb50f3477 input=842931a7d60f135e]*/ +{ + STACK_OF(X509) *peer_chain; /* reference */ + + assert((self->ctx != NULL) && (self->ctx->ctx != NULL)); + if (self->ssl == NULL) + Py_RETURN_NONE; + + peer_chain = SSL_get_peer_cert_chain(self->ssl); + /* In OpenSSL only the client side includes the peer certificate. + * Manually add it if required it to be more consistent. */ + if (self->socket_type == PY_SSL_SERVER) { + X509 *peer_cert = SSL_get_peer_certificate(self->ssl); + if (peer_cert != NULL) { + if (peer_chain == NULL) + peer_chain = sk_X509_new_null(); + sk_X509_insert(peer_chain, peer_cert, 0); + } + } + return chain_to_pyobject(peer_chain, binary_mode); +} + +#ifdef OPENSSL_VERSION_1_1 +/*[clinic input] +_ssl._SSLSocket.get_verified_chain + der as binary_mode: bool = False +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_get_verified_chain_impl(PySSLSocket *self, int binary_mode) +/*[clinic end generated code: output=6e07b709feaeb291 input=8f51efb220ed687f]*/ +{ + STACK_OF(X509) *peer_chain; /* reference */ + + assert((self->ctx != NULL) && (self->ctx->ctx != NULL)); + if (self->ssl == NULL) + Py_RETURN_NONE; + + peer_chain = SSL_get0_verified_chain(self->ssl); + long ret = SSL_get_verify_result(self->ssl); + if (ret != X509_V_OK) { + long e = ERR_PACK(ERR_LIB_SSL, 0, SSL_R_CERTIFICATE_VERIFY_FAILED); + fill_and_set_sslerror(self, PySSLCertVerificationErrorObject, PY_SSL_ERROR_SSL, NULL, __LINE__, e); + return NULL; + } + return chain_to_pyobject(peer_chain, binary_mode); +} +#endif + /*[clinic input] _ssl._SSLSocket.version [clinic start generated code]*/ @@ -3000,6 +3089,10 @@ static PyMethodDef PySSLMethods[] = { _SSL__SSLSOCKET_GET_CHANNEL_BINDING_METHODDEF _SSL__SSLSOCKET_CIPHER_METHODDEF _SSL__SSLSOCKET_SHARED_CIPHERS_METHODDEF + _SSL__SSLSOCKET_GET_UNVERIFIED_CHAIN_METHODDEF +#ifdef OPENSSL_VERSION_1_1 + _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF +#endif _SSL__SSLSOCKET_VERSION_METHODDEF _SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF _SSL__SSLSOCKET_SELECTED_ALPN_PROTOCOL_METHODDEF diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 43469d3c358242..7d1c72c910596f 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -122,6 +122,88 @@ _ssl__SSLSocket_cipher(PySSLSocket *self, PyObject *Py_UNUSED(ignored)) return _ssl__SSLSocket_cipher_impl(self); } +PyDoc_STRVAR(_ssl__SSLSocket_get_unverified_chain__doc__, +"get_unverified_chain($self, /, der=False)\n" +"--\n" +"\n"); + +#define _SSL__SSLSOCKET_GET_UNVERIFIED_CHAIN_METHODDEF \ + {"get_unverified_chain", (PyCFunction)(void(*)(void))_ssl__SSLSocket_get_unverified_chain, METH_FASTCALL|METH_KEYWORDS, _ssl__SSLSocket_get_unverified_chain__doc__}, + +static PyObject * +_ssl__SSLSocket_get_unverified_chain_impl(PySSLSocket *self, int binary_mode); + +static PyObject * +_ssl__SSLSocket_get_unverified_chain(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"der", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "get_unverified_chain", 0}; + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + int binary_mode = 0; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_pos; + } + binary_mode = PyObject_IsTrue(args[0]); + if (binary_mode < 0) { + goto exit; + } +skip_optional_pos: + return_value = _ssl__SSLSocket_get_unverified_chain_impl(self, binary_mode); + +exit: + return return_value; +} + +#if defined(OPENSSL_VERSION_1_1) + +PyDoc_STRVAR(_ssl__SSLSocket_get_verified_chain__doc__, +"get_verified_chain($self, /, der=False)\n" +"--\n" +"\n"); + +#define _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF \ + {"get_verified_chain", (PyCFunction)(void(*)(void))_ssl__SSLSocket_get_verified_chain, METH_FASTCALL|METH_KEYWORDS, _ssl__SSLSocket_get_verified_chain__doc__}, + +static PyObject * +_ssl__SSLSocket_get_verified_chain_impl(PySSLSocket *self, int binary_mode); + +static PyObject * +_ssl__SSLSocket_get_verified_chain(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"der", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "get_verified_chain", 0}; + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + int binary_mode = 0; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_pos; + } + binary_mode = PyObject_IsTrue(args[0]); + if (binary_mode < 0) { + goto exit; + } +skip_optional_pos: + return_value = _ssl__SSLSocket_get_verified_chain_impl(self, binary_mode); + +exit: + return return_value; +} + +#endif /* defined(OPENSSL_VERSION_1_1) */ + PyDoc_STRVAR(_ssl__SSLSocket_version__doc__, "version($self, /)\n" "--\n" @@ -1420,6 +1502,10 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #endif /* defined(_MSC_VER) */ +#ifndef _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF + #define _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF +#endif /* !defined(_SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF) */ + #ifndef _SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF #define _SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF #endif /* !defined(_SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF) */ @@ -1447,4 +1533,4 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=2bb53a80040c9b35 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=3f880c7260e778fd input=a9049054013a1b77]*/