diff --git a/changelog.d/18133.misc b/changelog.d/18133.misc
new file mode 100644
index 00000000000..151ceb2cab3
--- /dev/null
+++ b/changelog.d/18133.misc
@@ -0,0 +1 @@
+Disable statement timeout during room purge.
diff --git a/changelog.d/18232.feature b/changelog.d/18232.feature
new file mode 100644
index 00000000000..ba5059ba80a
--- /dev/null
+++ b/changelog.d/18232.feature
@@ -0,0 +1 @@
+Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL.
diff --git a/changelog.d/18294.docker b/changelog.d/18294.docker
new file mode 100644
index 00000000000..cc40ca90c0f
--- /dev/null
+++ b/changelog.d/18294.docker
@@ -0,0 +1 @@
+Optimize the build of the complement-synapse image.
diff --git a/changelog.d/18334.bugfix b/changelog.d/18334.bugfix
new file mode 100644
index 00000000000..d82e522cb89
--- /dev/null
+++ b/changelog.d/18334.bugfix
@@ -0,0 +1 @@
+Fix `force_tracing_for_users` config when using delegated auth.
diff --git a/changelog.d/18335.bugfix b/changelog.d/18335.bugfix
new file mode 100644
index 00000000000..50df5a1b1d7
--- /dev/null
+++ b/changelog.d/18335.bugfix
@@ -0,0 +1 @@
+Fix the token introspection cache logging access tokens when MAS integration is in use.
\ No newline at end of file
diff --git a/changelog.d/18337.misc b/changelog.d/18337.misc
new file mode 100644
index 00000000000..b78276fe765
--- /dev/null
+++ b/changelog.d/18337.misc
@@ -0,0 +1 @@
+Add cache to storage functions used to auth requests when using delegated auth.
diff --git a/changelog.d/18339.bugfix b/changelog.d/18339.bugfix
new file mode 100644
index 00000000000..09d6d734200
--- /dev/null
+++ b/changelog.d/18339.bugfix
@@ -0,0 +1 @@
+Stop caching introspection failures when delegating auth to MAS.
diff --git a/changelog.d/18342.bugfix b/changelog.d/18342.bugfix
new file mode 100644
index 00000000000..6fa2fa679a9
--- /dev/null
+++ b/changelog.d/18342.bugfix
@@ -0,0 +1 @@
+Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic.
diff --git a/changelog.d/18345.bugfix b/changelog.d/18345.bugfix
new file mode 100644
index 00000000000..c8a001d4a3f
--- /dev/null
+++ b/changelog.d/18345.bugfix
@@ -0,0 +1 @@
+Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0.
diff --git a/docker/complement/Dockerfile b/docker/complement/Dockerfile
index 3e7f808cc56..dd029c5fbc3 100644
--- a/docker/complement/Dockerfile
+++ b/docker/complement/Dockerfile
@@ -25,7 +25,7 @@ FROM $FROM
 RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
 COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql
 COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql
-RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
+COPY --from=postgres_base --chown=postgres /var/run/postgresql /var/run/postgresql
 ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
 ENV PGDATA=/var/lib/postgresql/data
 
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index d2d282f2037..73fd9622cec 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3672,6 +3672,9 @@ Options for each entry include:
 * `additional_authorization_parameters`: String to string dictionary that will be passed as
    additional parameters to the authorization grant URL.
 
+* `passthrough_authorization_parameters`: List of parameters that will be passed through from the redirect endpoint 
+   to the authorization grant URL.
+
 * `allow_existing_users`: set to true to allow a user logging in via OIDC to
    match a pre-existing account instead of failing. This could be used if
    switching from password logins to OIDC. Defaults to false.
@@ -3798,6 +3801,7 @@ oidc_providers:
     jwks_uri: "https://accounts.example.com/.well-known/jwks.json"
     additional_authorization_parameters:
       acr_values: 2fa
+    passthrough_authorization_parameters: ["login_hint"]
     skip_verification: true
     enable_registration: true
     user_mapping_provider:
diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py
index 74e526123fd..9ded3366e3f 100644
--- a/synapse/api/auth/msc3861_delegated.py
+++ b/synapse/api/auth/msc3861_delegated.py
@@ -45,10 +45,11 @@
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.opentracing import active_span, force_tracing, start_active_span
 from synapse.types import Requester, UserID, create_requester
 from synapse.util import json_decoder
 from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
-from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
 
 if TYPE_CHECKING:
     from synapse.rest.admin.experimental_features import ExperimentalFeature
@@ -177,6 +178,7 @@ def __init__(self, hs: "HomeServer"):
         self._http_client = hs.get_proxied_http_client()
         self._hostname = hs.hostname
         self._admin_token: Callable[[], Optional[str]] = self._config.admin_token
+        self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
 
         # # Token Introspection Cache
         # This remembers what users/devices are represented by which access tokens,
@@ -201,6 +203,8 @@ def __init__(self, hs: "HomeServer"):
             self._clock,
             "token_introspection",
             timeout_ms=120_000,
+            # don't log because the keys are access tokens
+            enable_logging=False,
         )
 
         self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata](
@@ -275,7 +279,9 @@ async def _introspection_endpoint(self) -> str:
         metadata = await self._issuer_metadata.get()
         return metadata.get("introspection_endpoint")
 
-    async def _introspect_token(self, token: str) -> IntrospectionResult:
+    async def _introspect_token(
+        self, token: str, cache_context: ResponseCacheContext[str]
+    ) -> IntrospectionResult:
         """
         Send a token to the introspection endpoint and returns the introspection response
 
@@ -291,6 +297,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult:
         Returns:
             The introspection response
         """
+        # By default, we shouldn't cache the result unless we know it's valid
+        cache_context.should_cache = False
         introspection_endpoint = await self._introspection_endpoint()
         raw_headers: Dict[str, str] = {
             "Content-Type": "application/x-www-form-urlencoded",
@@ -348,6 +356,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult:
                 "The introspection endpoint returned an invalid JSON response."
             )
 
+        # We had a valid response, so we can cache it
+        cache_context.should_cache = True
         return IntrospectionResult(
             IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec()
         )
@@ -361,6 +371,55 @@ async def get_user_by_req(
         allow_guest: bool = False,
         allow_expired: bool = False,
         allow_locked: bool = False,
+    ) -> Requester:
+        """Get a registered user's ID.
+
+        Args:
+            request: An HTTP request with an access_token query parameter.
+            allow_guest: If False, will raise an AuthError if the user making the
+                request is a guest.
+            allow_expired: If True, allow the request through even if the account
+                is expired, or session token lifetime has ended. Note that
+                /login will deliver access tokens regardless of expiration.
+
+        Returns:
+            Resolves to the requester
+        Raises:
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid.
+            AuthError if access is denied for the user in the access token
+        """
+        parent_span = active_span()
+        with start_active_span("get_user_by_req"):
+            requester = await self._wrapped_get_user_by_req(
+                request, allow_guest, allow_expired, allow_locked
+            )
+
+            if parent_span:
+                if requester.authenticated_entity in self._force_tracing_for_users:
+                    # request tracing is enabled for this user, so we need to force it
+                    # tracing on for the parent span (which will be the servlet span).
+                    #
+                    # It's too late for the get_user_by_req span to inherit the setting,
+                    # so we also force it on for that.
+                    force_tracing()
+                    force_tracing(parent_span)
+                parent_span.set_tag(
+                    "authenticated_entity", requester.authenticated_entity
+                )
+                parent_span.set_tag("user_id", requester.user.to_string())
+                if requester.device_id is not None:
+                    parent_span.set_tag("device_id", requester.device_id)
+                if requester.app_service is not None:
+                    parent_span.set_tag("appservice_id", requester.app_service.id)
+            return requester
+
+    async def _wrapped_get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+        allow_locked: bool = False,
     ) -> Requester:
         access_token = self.get_access_token_from_request(request)
 
@@ -429,7 +488,7 @@ async def get_user_by_access_token(
 
         try:
             introspection_result = await self._introspection_cache.wrap(
-                token, self._introspect_token, token
+                token, self._introspect_token, token, cache_context=True
             )
         except Exception:
             logger.exception("Failed to introspect token")
diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py
index 8ba0ba2c360..3ddf65a3e91 100644
--- a/synapse/config/oidc.py
+++ b/synapse/config/oidc.py
@@ -356,6 +356,9 @@ def _parse_oidc_config_dict(
         additional_authorization_parameters=oidc_config.get(
             "additional_authorization_parameters", {}
         ),
+        passthrough_authorization_parameters=oidc_config.get(
+            "passthrough_authorization_parameters", []
+        ),
     )
 
 
@@ -501,3 +504,6 @@ class OidcProviderConfig:
 
     # Additional parameters that will be passed to the authorization grant URL
     additional_authorization_parameters: Mapping[str, str]
+
+    # Allow query parameters to the redirect endpoint that will be passed to the authorization grant URL
+    passthrough_authorization_parameters: Collection[str]
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index d9622080b4f..1efd039f227 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -163,6 +163,8 @@ async def get_device(self, user_id: str, device_id: str) -> JsonDict:
             raise errors.NotFoundError()
 
         ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
+
+        device = dict(device)
         _update_device_from_client_ips(device, ips)
 
         set_tag("device", str(device))
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 18efdd9f6ee..c4cf0636a3a 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -467,6 +467,10 @@ def __init__(
 
         self._sso_handler.register_identity_provider(self)
 
+        self.passthrough_authorization_parameters = (
+            provider.passthrough_authorization_parameters
+        )
+
     def _validate_metadata(self, m: OpenIDProviderMetadata) -> None:
         """Verifies the provider metadata.
 
@@ -1005,7 +1009,6 @@ async def handle_redirect_request(
                 when everything is done (or None for UI Auth)
             ui_auth_session_id: The session ID of the ongoing UI Auth (or
                 None if this is a login).
-
         Returns:
             The redirect URL to the authorization endpoint.
 
@@ -1078,6 +1081,13 @@ async def handle_redirect_request(
                 )
             )
 
+        # add passthrough additional authorization parameters
+        passthrough_authorization_parameters = self.passthrough_authorization_parameters
+        for parameter in passthrough_authorization_parameters:
+            parameter_value = parse_string(request, parameter)
+            if parameter_value:
+                additional_authorization_parameters.update({parameter: parameter_value})
+
         authorization_endpoint = metadata.get("authorization_endpoint")
         return prepare_grant_uri(
             authorization_endpoint,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 0b6d1f2b050..3f0b2f5d848 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -282,9 +282,10 @@ def count_devices_by_users_txn(
             "count_devices_by_users", count_devices_by_users_txn, user_ids
         )
 
+    @cached()
     async def get_device(
         self, user_id: str, device_id: str
-    ) -> Optional[Dict[str, Any]]:
+    ) -> Optional[Mapping[str, Any]]:
         """Retrieve a device. Only returns devices that are not marked as
         hidden.
 
@@ -1817,6 +1818,8 @@ async def store_device(
                 },
                 desc="store_device",
             )
+            await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
+
             if not inserted:
                 # if the device already exists, check if it's a real device, or
                 # if the device ID is reserved by something else
@@ -1882,6 +1885,9 @@ def _delete_devices_txn(txn: LoggingTransaction, device_ids: List[str]) -> None:
                 values=device_ids,
                 keyvalues={"user_id": user_id},
             )
+            self._invalidate_cache_and_stream_bulk(
+                txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
+            )
 
         for batch in batch_iter(device_ids, 100):
             await self.db_pool.runInteraction(
@@ -1915,6 +1921,7 @@ async def update_device(
             updatevalues=updates,
             desc="update_device",
         )
+        await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
 
     async def update_remote_device_list_cache_entry(
         self, user_id: str, device_id: str, content: JsonDict, stream_id: str
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 8380930c70e..c43f31353b6 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -759,17 +759,37 @@ def _record_user_external_id_txn(
             external_id: id on that system
             user_id: complete mxid that it is mapped to
         """
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_by_external_id, (auth_provider, external_id)
+        )
 
-        self.db_pool.simple_insert_txn(
+        # This INSERT ... ON CONFLICT DO NOTHING statement will cause a
+        # 'could not serialize access due to concurrent update'
+        # if the row is added concurrently by another transaction.
+        # This is exactly what we want, as it makes the transaction get retried
+        # in a new snapshot where we can check for a genuine conflict.
+        was_inserted = self.db_pool.simple_upsert_txn(
             txn,
             table="user_external_ids",
-            values={
-                "auth_provider": auth_provider,
-                "external_id": external_id,
-                "user_id": user_id,
-            },
+            keyvalues={"auth_provider": auth_provider, "external_id": external_id},
+            values={},
+            insertion_values={"user_id": user_id},
         )
 
+        if not was_inserted:
+            existing_id = self.db_pool.simple_select_one_onecol_txn(
+                txn,
+                table="user_external_ids",
+                keyvalues={"auth_provider": auth_provider, "user_id": user_id},
+                retcol="external_id",
+                allow_none=True,
+            )
+
+            if existing_id != external_id:
+                raise ExternalIDReuseException(
+                    f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}"
+                )
+
     async def remove_user_external_id(
         self, auth_provider: str, external_id: str, user_id: str
     ) -> None:
@@ -789,6 +809,9 @@ async def remove_user_external_id(
             },
             desc="remove_user_external_id",
         )
+        await self.invalidate_cache_and_stream(
+            "get_user_by_external_id", (auth_provider, external_id)
+        )
 
     async def replace_user_external_id(
         self,
@@ -809,29 +832,20 @@ async def replace_user_external_id(
             ExternalIDReuseException if the new external_id could not be mapped.
         """
 
-        def _remove_user_external_ids_txn(
+        def _replace_user_external_id_txn(
             txn: LoggingTransaction,
-            user_id: str,
         ) -> None:
-            """Remove all mappings from external user ids to a mxid
-            If these mappings are not found, this method does nothing.
-
-            Args:
-                user_id: complete mxid that it is mapped to
-            """
-
             self.db_pool.simple_delete_txn(
                 txn,
                 table="user_external_ids",
                 keyvalues={"user_id": user_id},
             )
 
-        def _replace_user_external_id_txn(
-            txn: LoggingTransaction,
-        ) -> None:
-            _remove_user_external_ids_txn(txn, user_id)
-
             for auth_provider, external_id in record_external_ids:
+                self._invalidate_cache_and_stream(
+                    txn, self.get_user_by_external_id, (auth_provider, external_id)
+                )
+
                 self._record_user_external_id_txn(
                     txn,
                     auth_provider,
@@ -847,6 +861,7 @@ def _replace_user_external_id_txn(
         except self.database_engine.module.IntegrityError:
             raise ExternalIDReuseException()
 
+    @cached()
     async def get_user_by_external_id(
         self, auth_provider: str, external_id: str
     ) -> Optional[str]:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a0a6dcd04e7..dfa7dd48d9a 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1622,14 +1622,11 @@ def _set_room_participation_txn(
             sql = """
                 UPDATE room_memberships
                 SET participant = true
-                WHERE (user_id, room_id) IN (
-                    SELECT user_id, room_id
-                    FROM room_memberships
-                    WHERE user_id = ?
-                    AND room_id = ?
-                    ORDER BY event_stream_ordering DESC
-                    LIMIT 1
+                WHERE event_id IN (
+                    SELECT event_id FROM local_current_membership
+                    WHERE user_id = ? AND room_id = ?
                 )
+                AND NOT participant
             """
             txn.execute(sql, (user_id, room_id))
 
@@ -1651,11 +1648,10 @@ def _get_room_participation_txn(
         ) -> bool:
             sql = """
                 SELECT participant
-                FROM room_memberships
-                WHERE user_id = ?
-                AND room_id = ?
-                ORDER BY event_stream_ordering DESC
-                LIMIT 1
+                FROM local_current_membership AS l
+                INNER JOIN room_memberships AS r USING (event_id)
+                WHERE l.user_id = ?
+                AND l.room_id = ?
             """
             txn.execute(sql, (user_id, room_id))
             res = txn.fetchone()
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 90d7beb92fe..c1a66dcba02 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -48,6 +48,7 @@
     LoggingTransaction,
 )
 from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import MutableStateMap, StateKey, StateMap
@@ -914,6 +915,12 @@ def _purge_room_state_txn(
     ) -> None:
         # Delete all edges that reference a state group linked to room_id
         logger.info("[purge] removing %s from state_group_edges", room_id)
+
+        if isinstance(self.database_engine, PostgresEngine):
+            # Disable statement timeouts for this transaction; purging rooms can
+            # take a while!
+            txn.execute("SET LOCAL statement_timeout = 0")
+
         txn.execute(
             """
             DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 96b7ca83dcb..54b99134b9c 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
+    def __init__(
+        self,
+        clock: Clock,
+        name: str,
+        timeout_ms: float = 0,
+        enable_logging: bool = True,
+    ):
         self._result_cache: Dict[KV, ResponseCacheEntry] = {}
 
         self.clock = clock
@@ -109,6 +115,7 @@ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
 
         self._name = name
         self._metrics = register_cache("response_cache", name, self, resizable=False)
+        self._enable_logging = enable_logging
 
     def size(self) -> int:
         return len(self._result_cache)
@@ -246,9 +253,12 @@ async def handle_request(request):
         """
         entry = self._get(key)
         if not entry:
-            logger.debug(
-                "[%s]: no cached result for [%s], calculating new one", self._name, key
-            )
+            if self._enable_logging:
+                logger.debug(
+                    "[%s]: no cached result for [%s], calculating new one",
+                    self._name,
+                    key,
+                )
             context = ResponseCacheContext(cache_key=key)
             if cache_context:
                 kwargs["cache_context"] = context
@@ -269,12 +279,15 @@ async def cb() -> RV:
             return await make_deferred_yieldable(entry.result.observe())
 
         result = entry.result.observe()
-        if result.called:
-            logger.info("[%s]: using completed cached result for [%s]", self._name, key)
-        else:
-            logger.info(
-                "[%s]: using incomplete cached result for [%s]", self._name, key
-            )
+        if self._enable_logging:
+            if result.called:
+                logger.info(
+                    "[%s]: using completed cached result for [%s]", self._name, key
+                )
+            else:
+                logger.info(
+                    "[%s]: using incomplete cached result for [%s]", self._name, key
+                )
 
         span_context = entry.opentracing_span_context
         with start_active_span_follows_from(
diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index cfd9969563e..a7cead83d0d 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -484,6 +484,32 @@ def test_redirect_request(self) -> None:
         self.assertEqual(code_verifier, "")
         self.assertEqual(redirect, "http://client/redirect")
 
+    @override_config(
+        {
+            "oidc_config": {
+                **DEFAULT_CONFIG,
+                "passthrough_authorization_parameters": ["additional_parameter"],
+            }
+        }
+    )
+    def test_passthrough_parameters(self) -> None:
+        """The redirect request has additional parameters, one is authorized, one is not"""
+        req = Mock(spec=["cookies", "args"])
+        req.cookies = []
+        req.args = {}
+        req.args[b"additional_parameter"] = ["a_value".encode("utf-8")]
+        req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")]
+
+        url = urlparse(
+            self.get_success(
+                self.provider.handle_redirect_request(req, b"http://client/redirect")
+            )
+        )
+
+        params = parse_qs(url.query)
+        self.assertEqual(params["additional_parameter"], ["a_value"])
+        self.assertNotIn("not_authorized_parameters", params)
+
     @override_config({"oidc_config": DEFAULT_CONFIG})
     def test_redirect_request_with_code_challenge(self) -> None:
         """The redirect request has the right arguments & generates a valid session cookie."""