diff --git a/changelog.d/18133.misc b/changelog.d/18133.misc new file mode 100644 index 00000000000..151ceb2cab3 --- /dev/null +++ b/changelog.d/18133.misc @@ -0,0 +1 @@ +Disable statement timeout during room purge. diff --git a/changelog.d/18232.feature b/changelog.d/18232.feature new file mode 100644 index 00000000000..ba5059ba80a --- /dev/null +++ b/changelog.d/18232.feature @@ -0,0 +1 @@ +Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL. diff --git a/changelog.d/18294.docker b/changelog.d/18294.docker new file mode 100644 index 00000000000..cc40ca90c0f --- /dev/null +++ b/changelog.d/18294.docker @@ -0,0 +1 @@ +Optimize the build of the complement-synapse image. diff --git a/changelog.d/18334.bugfix b/changelog.d/18334.bugfix new file mode 100644 index 00000000000..d82e522cb89 --- /dev/null +++ b/changelog.d/18334.bugfix @@ -0,0 +1 @@ +Fix `force_tracing_for_users` config when using delegated auth. diff --git a/changelog.d/18335.bugfix b/changelog.d/18335.bugfix new file mode 100644 index 00000000000..50df5a1b1d7 --- /dev/null +++ b/changelog.d/18335.bugfix @@ -0,0 +1 @@ +Fix the token introspection cache logging access tokens when MAS integration is in use. \ No newline at end of file diff --git a/changelog.d/18337.misc b/changelog.d/18337.misc new file mode 100644 index 00000000000..b78276fe765 --- /dev/null +++ b/changelog.d/18337.misc @@ -0,0 +1 @@ +Add cache to storage functions used to auth requests when using delegated auth. diff --git a/changelog.d/18339.bugfix b/changelog.d/18339.bugfix new file mode 100644 index 00000000000..09d6d734200 --- /dev/null +++ b/changelog.d/18339.bugfix @@ -0,0 +1 @@ +Stop caching introspection failures when delegating auth to MAS. diff --git a/changelog.d/18342.bugfix b/changelog.d/18342.bugfix new file mode 100644 index 00000000000..6fa2fa679a9 --- /dev/null +++ b/changelog.d/18342.bugfix @@ -0,0 +1 @@ +Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic. diff --git a/changelog.d/18345.bugfix b/changelog.d/18345.bugfix new file mode 100644 index 00000000000..c8a001d4a3f --- /dev/null +++ b/changelog.d/18345.bugfix @@ -0,0 +1 @@ +Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0. diff --git a/docker/complement/Dockerfile b/docker/complement/Dockerfile index 3e7f808cc56..dd029c5fbc3 100644 --- a/docker/complement/Dockerfile +++ b/docker/complement/Dockerfile @@ -25,7 +25,7 @@ FROM $FROM RUN adduser --system --uid 999 postgres --home /var/lib/postgresql COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql -RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql +COPY --from=postgres_base --chown=postgres /var/run/postgresql /var/run/postgresql ENV PATH="${PATH}:/usr/lib/postgresql/13/bin" ENV PGDATA=/var/lib/postgresql/data diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index d2d282f2037..73fd9622cec 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3672,6 +3672,9 @@ Options for each entry include: * `additional_authorization_parameters`: String to string dictionary that will be passed as additional parameters to the authorization grant URL. +* `passthrough_authorization_parameters`: List of parameters that will be passed through from the redirect endpoint + to the authorization grant URL. + * `allow_existing_users`: set to true to allow a user logging in via OIDC to match a pre-existing account instead of failing. This could be used if switching from password logins to OIDC. Defaults to false. @@ -3798,6 +3801,7 @@ oidc_providers: jwks_uri: "https://accounts.example.com/.well-known/jwks.json" additional_authorization_parameters: acr_values: 2fa + passthrough_authorization_parameters: ["login_hint"] skip_verification: true enable_registration: true user_mapping_provider: diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 74e526123fd..9ded3366e3f 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -45,10 +45,11 @@ ) from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable +from synapse.logging.opentracing import active_span, force_tracing, start_active_span from synapse.types import Requester, UserID, create_requester from synapse.util import json_decoder from synapse.util.caches.cached_call import RetryOnExceptionCachedCall -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext if TYPE_CHECKING: from synapse.rest.admin.experimental_features import ExperimentalFeature @@ -177,6 +178,7 @@ def __init__(self, hs: "HomeServer"): self._http_client = hs.get_proxied_http_client() self._hostname = hs.hostname self._admin_token: Callable[[], Optional[str]] = self._config.admin_token + self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users # # Token Introspection Cache # This remembers what users/devices are represented by which access tokens, @@ -201,6 +203,8 @@ def __init__(self, hs: "HomeServer"): self._clock, "token_introspection", timeout_ms=120_000, + # don't log because the keys are access tokens + enable_logging=False, ) self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata]( @@ -275,7 +279,9 @@ async def _introspection_endpoint(self) -> str: metadata = await self._issuer_metadata.get() return metadata.get("introspection_endpoint") - async def _introspect_token(self, token: str) -> IntrospectionResult: + async def _introspect_token( + self, token: str, cache_context: ResponseCacheContext[str] + ) -> IntrospectionResult: """ Send a token to the introspection endpoint and returns the introspection response @@ -291,6 +297,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult: Returns: The introspection response """ + # By default, we shouldn't cache the result unless we know it's valid + cache_context.should_cache = False introspection_endpoint = await self._introspection_endpoint() raw_headers: Dict[str, str] = { "Content-Type": "application/x-www-form-urlencoded", @@ -348,6 +356,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult: "The introspection endpoint returned an invalid JSON response." ) + # We had a valid response, so we can cache it + cache_context.should_cache = True return IntrospectionResult( IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec() ) @@ -361,6 +371,55 @@ async def get_user_by_req( allow_guest: bool = False, allow_expired: bool = False, allow_locked: bool = False, + ) -> Requester: + """Get a registered user's ID. + + Args: + request: An HTTP request with an access_token query parameter. + allow_guest: If False, will raise an AuthError if the user making the + request is a guest. + allow_expired: If True, allow the request through even if the account + is expired, or session token lifetime has ended. Note that + /login will deliver access tokens regardless of expiration. + + Returns: + Resolves to the requester + Raises: + InvalidClientCredentialsError if no user by that token exists or the token + is invalid. + AuthError if access is denied for the user in the access token + """ + parent_span = active_span() + with start_active_span("get_user_by_req"): + requester = await self._wrapped_get_user_by_req( + request, allow_guest, allow_expired, allow_locked + ) + + if parent_span: + if requester.authenticated_entity in self._force_tracing_for_users: + # request tracing is enabled for this user, so we need to force it + # tracing on for the parent span (which will be the servlet span). + # + # It's too late for the get_user_by_req span to inherit the setting, + # so we also force it on for that. + force_tracing() + force_tracing(parent_span) + parent_span.set_tag( + "authenticated_entity", requester.authenticated_entity + ) + parent_span.set_tag("user_id", requester.user.to_string()) + if requester.device_id is not None: + parent_span.set_tag("device_id", requester.device_id) + if requester.app_service is not None: + parent_span.set_tag("appservice_id", requester.app_service.id) + return requester + + async def _wrapped_get_user_by_req( + self, + request: SynapseRequest, + allow_guest: bool = False, + allow_expired: bool = False, + allow_locked: bool = False, ) -> Requester: access_token = self.get_access_token_from_request(request) @@ -429,7 +488,7 @@ async def get_user_by_access_token( try: introspection_result = await self._introspection_cache.wrap( - token, self._introspect_token, token + token, self._introspect_token, token, cache_context=True ) except Exception: logger.exception("Failed to introspect token") diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py index 8ba0ba2c360..3ddf65a3e91 100644 --- a/synapse/config/oidc.py +++ b/synapse/config/oidc.py @@ -356,6 +356,9 @@ def _parse_oidc_config_dict( additional_authorization_parameters=oidc_config.get( "additional_authorization_parameters", {} ), + passthrough_authorization_parameters=oidc_config.get( + "passthrough_authorization_parameters", [] + ), ) @@ -501,3 +504,6 @@ class OidcProviderConfig: # Additional parameters that will be passed to the authorization grant URL additional_authorization_parameters: Mapping[str, str] + + # Allow query parameters to the redirect endpoint that will be passed to the authorization grant URL + passthrough_authorization_parameters: Collection[str] diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d9622080b4f..1efd039f227 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -163,6 +163,8 @@ async def get_device(self, user_id: str, device_id: str) -> JsonDict: raise errors.NotFoundError() ips = await self.store.get_last_client_ip_by_device(user_id, device_id) + + device = dict(device) _update_device_from_client_ips(device, ips) set_tag("device", str(device)) diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index 18efdd9f6ee..c4cf0636a3a 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -467,6 +467,10 @@ def __init__( self._sso_handler.register_identity_provider(self) + self.passthrough_authorization_parameters = ( + provider.passthrough_authorization_parameters + ) + def _validate_metadata(self, m: OpenIDProviderMetadata) -> None: """Verifies the provider metadata. @@ -1005,7 +1009,6 @@ async def handle_redirect_request( when everything is done (or None for UI Auth) ui_auth_session_id: The session ID of the ongoing UI Auth (or None if this is a login). - Returns: The redirect URL to the authorization endpoint. @@ -1078,6 +1081,13 @@ async def handle_redirect_request( ) ) + # add passthrough additional authorization parameters + passthrough_authorization_parameters = self.passthrough_authorization_parameters + for parameter in passthrough_authorization_parameters: + parameter_value = parse_string(request, parameter) + if parameter_value: + additional_authorization_parameters.update({parameter: parameter_value}) + authorization_endpoint = metadata.get("authorization_endpoint") return prepare_grant_uri( authorization_endpoint, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 0b6d1f2b050..3f0b2f5d848 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -282,9 +282,10 @@ def count_devices_by_users_txn( "count_devices_by_users", count_devices_by_users_txn, user_ids ) + @cached() async def get_device( self, user_id: str, device_id: str - ) -> Optional[Dict[str, Any]]: + ) -> Optional[Mapping[str, Any]]: """Retrieve a device. Only returns devices that are not marked as hidden. @@ -1817,6 +1818,8 @@ async def store_device( }, desc="store_device", ) + await self.invalidate_cache_and_stream("get_device", (user_id, device_id)) + if not inserted: # if the device already exists, check if it's a real device, or # if the device ID is reserved by something else @@ -1882,6 +1885,9 @@ def _delete_devices_txn(txn: LoggingTransaction, device_ids: List[str]) -> None: values=device_ids, keyvalues={"user_id": user_id}, ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_device, [(user_id, device_id) for device_id in device_ids] + ) for batch in batch_iter(device_ids, 100): await self.db_pool.runInteraction( @@ -1915,6 +1921,7 @@ async def update_device( updatevalues=updates, desc="update_device", ) + await self.invalidate_cache_and_stream("get_device", (user_id, device_id)) async def update_remote_device_list_cache_entry( self, user_id: str, device_id: str, content: JsonDict, stream_id: str diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 8380930c70e..c43f31353b6 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -759,17 +759,37 @@ def _record_user_external_id_txn( external_id: id on that system user_id: complete mxid that it is mapped to """ + self._invalidate_cache_and_stream( + txn, self.get_user_by_external_id, (auth_provider, external_id) + ) - self.db_pool.simple_insert_txn( + # This INSERT ... ON CONFLICT DO NOTHING statement will cause a + # 'could not serialize access due to concurrent update' + # if the row is added concurrently by another transaction. + # This is exactly what we want, as it makes the transaction get retried + # in a new snapshot where we can check for a genuine conflict. + was_inserted = self.db_pool.simple_upsert_txn( txn, table="user_external_ids", - values={ - "auth_provider": auth_provider, - "external_id": external_id, - "user_id": user_id, - }, + keyvalues={"auth_provider": auth_provider, "external_id": external_id}, + values={}, + insertion_values={"user_id": user_id}, ) + if not was_inserted: + existing_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="user_external_ids", + keyvalues={"auth_provider": auth_provider, "user_id": user_id}, + retcol="external_id", + allow_none=True, + ) + + if existing_id != external_id: + raise ExternalIDReuseException( + f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}" + ) + async def remove_user_external_id( self, auth_provider: str, external_id: str, user_id: str ) -> None: @@ -789,6 +809,9 @@ async def remove_user_external_id( }, desc="remove_user_external_id", ) + await self.invalidate_cache_and_stream( + "get_user_by_external_id", (auth_provider, external_id) + ) async def replace_user_external_id( self, @@ -809,29 +832,20 @@ async def replace_user_external_id( ExternalIDReuseException if the new external_id could not be mapped. """ - def _remove_user_external_ids_txn( + def _replace_user_external_id_txn( txn: LoggingTransaction, - user_id: str, ) -> None: - """Remove all mappings from external user ids to a mxid - If these mappings are not found, this method does nothing. - - Args: - user_id: complete mxid that it is mapped to - """ - self.db_pool.simple_delete_txn( txn, table="user_external_ids", keyvalues={"user_id": user_id}, ) - def _replace_user_external_id_txn( - txn: LoggingTransaction, - ) -> None: - _remove_user_external_ids_txn(txn, user_id) - for auth_provider, external_id in record_external_ids: + self._invalidate_cache_and_stream( + txn, self.get_user_by_external_id, (auth_provider, external_id) + ) + self._record_user_external_id_txn( txn, auth_provider, @@ -847,6 +861,7 @@ def _replace_user_external_id_txn( except self.database_engine.module.IntegrityError: raise ExternalIDReuseException() + @cached() async def get_user_by_external_id( self, auth_provider: str, external_id: str ) -> Optional[str]: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a0a6dcd04e7..dfa7dd48d9a 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1622,14 +1622,11 @@ def _set_room_participation_txn( sql = """ UPDATE room_memberships SET participant = true - WHERE (user_id, room_id) IN ( - SELECT user_id, room_id - FROM room_memberships - WHERE user_id = ? - AND room_id = ? - ORDER BY event_stream_ordering DESC - LIMIT 1 + WHERE event_id IN ( + SELECT event_id FROM local_current_membership + WHERE user_id = ? AND room_id = ? ) + AND NOT participant """ txn.execute(sql, (user_id, room_id)) @@ -1651,11 +1648,10 @@ def _get_room_participation_txn( ) -> bool: sql = """ SELECT participant - FROM room_memberships - WHERE user_id = ? - AND room_id = ? - ORDER BY event_stream_ordering DESC - LIMIT 1 + FROM local_current_membership AS l + INNER JOIN room_memberships AS r USING (event_id) + WHERE l.user_id = ? + AND l.room_id = ? """ txn.execute(sql, (user_id, room_id)) res = txn.fetchone() diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 90d7beb92fe..c1a66dcba02 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -48,6 +48,7 @@ LoggingTransaction, ) from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateKey, StateMap @@ -914,6 +915,12 @@ def _purge_room_state_txn( ) -> None: # Delete all edges that reference a state group linked to room_id logger.info("[purge] removing %s from state_group_edges", room_id) + + if isinstance(self.database_engine, PostgresEngine): + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + txn.execute( """ DELETE FROM state_group_edges AS sge WHERE sge.state_group IN ( diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 96b7ca83dcb..54b99134b9c 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]): used rather than trying to compute a new response. """ - def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): + def __init__( + self, + clock: Clock, + name: str, + timeout_ms: float = 0, + enable_logging: bool = True, + ): self._result_cache: Dict[KV, ResponseCacheEntry] = {} self.clock = clock @@ -109,6 +115,7 @@ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): self._name = name self._metrics = register_cache("response_cache", name, self, resizable=False) + self._enable_logging = enable_logging def size(self) -> int: return len(self._result_cache) @@ -246,9 +253,12 @@ async def handle_request(request): """ entry = self._get(key) if not entry: - logger.debug( - "[%s]: no cached result for [%s], calculating new one", self._name, key - ) + if self._enable_logging: + logger.debug( + "[%s]: no cached result for [%s], calculating new one", + self._name, + key, + ) context = ResponseCacheContext(cache_key=key) if cache_context: kwargs["cache_context"] = context @@ -269,12 +279,15 @@ async def cb() -> RV: return await make_deferred_yieldable(entry.result.observe()) result = entry.result.observe() - if result.called: - logger.info("[%s]: using completed cached result for [%s]", self._name, key) - else: - logger.info( - "[%s]: using incomplete cached result for [%s]", self._name, key - ) + if self._enable_logging: + if result.called: + logger.info( + "[%s]: using completed cached result for [%s]", self._name, key + ) + else: + logger.info( + "[%s]: using incomplete cached result for [%s]", self._name, key + ) span_context = entry.opentracing_span_context with start_active_span_follows_from( diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index cfd9969563e..a7cead83d0d 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -484,6 +484,32 @@ def test_redirect_request(self) -> None: self.assertEqual(code_verifier, "") self.assertEqual(redirect, "http://client/redirect") + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "passthrough_authorization_parameters": ["additional_parameter"], + } + } + ) + def test_passthrough_parameters(self) -> None: + """The redirect request has additional parameters, one is authorized, one is not""" + req = Mock(spec=["cookies", "args"]) + req.cookies = [] + req.args = {} + req.args[b"additional_parameter"] = ["a_value".encode("utf-8")] + req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")] + + url = urlparse( + self.get_success( + self.provider.handle_redirect_request(req, b"http://client/redirect") + ) + ) + + params = parse_qs(url.query) + self.assertEqual(params["additional_parameter"], ["a_value"]) + self.assertNotIn("not_authorized_parameters", params) + @override_config({"oidc_config": DEFAULT_CONFIG}) def test_redirect_request_with_code_challenge(self) -> None: """The redirect request has the right arguments & generates a valid session cookie."""