From 5c84f258095535aaa2a4a04c850f439fd00735cc Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti <andrewf@element.io> Date: Tue, 1 Apr 2025 11:51:00 -0400 Subject: [PATCH 01/11] complement-synapse: COPY existing dir from base (#18294) The base postgres image already has the /var/run/postgresql directory, and COPY can set file ownership with chown=, so COPY it instead of making it from scratch & manually setting its ownership. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --- changelog.d/18294.docker | 1 + docker/complement/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/18294.docker diff --git a/changelog.d/18294.docker b/changelog.d/18294.docker new file mode 100644 index 00000000000..cc40ca90c0f --- /dev/null +++ b/changelog.d/18294.docker @@ -0,0 +1 @@ +Optimize the build of the complement-synapse image. diff --git a/docker/complement/Dockerfile b/docker/complement/Dockerfile index 3e7f808cc56..dd029c5fbc3 100644 --- a/docker/complement/Dockerfile +++ b/docker/complement/Dockerfile @@ -25,7 +25,7 @@ FROM $FROM RUN adduser --system --uid 999 postgres --home /var/lib/postgresql COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql -RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql +COPY --from=postgres_base --chown=postgres /var/run/postgresql /var/run/postgresql ENV PATH="${PATH}:/usr/lib/postgresql/13/bin" ENV PGDATA=/var/lib/postgresql/data From 0e3c0aeee833e52121b3167de486dff34018ab27 Mon Sep 17 00:00:00 2001 From: Jason Little <realtyem@gmail.com> Date: Wed, 2 Apr 2025 09:37:50 -0500 Subject: [PATCH 02/11] Disable Postgres statement timeouts while purging room state (#18133) --- changelog.d/18133.misc | 1 + synapse/storage/databases/state/store.py | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 changelog.d/18133.misc diff --git a/changelog.d/18133.misc b/changelog.d/18133.misc new file mode 100644 index 00000000000..151ceb2cab3 --- /dev/null +++ b/changelog.d/18133.misc @@ -0,0 +1 @@ +Disable statement timeout during room purge. diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 90d7beb92fe..c1a66dcba02 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -48,6 +48,7 @@ LoggingTransaction, ) from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateKey, StateMap @@ -914,6 +915,12 @@ def _purge_room_state_txn( ) -> None: # Delete all edges that reference a state group linked to room_id logger.info("[purge] removing %s from state_group_edges", room_id) + + if isinstance(self.database_engine, PostgresEngine): + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + txn.execute( """ DELETE FROM state_group_edges AS sge WHERE sge.state_group IN ( From dd05cc55eedbf086ae224a13c9ae9f0332d96b1f Mon Sep 17 00:00:00 2001 From: Olivier D <odelcroi@gmail.com> Date: Thu, 10 Apr 2025 15:39:27 +0200 Subject: [PATCH 03/11] Add passthrough_authorization_parameters support to OIDC configuration (#18232) # Add passthrough_authorization_parameters support to OIDC configuration This PR adds `the passthrough_authorization_parameters` option to OIDC configuration, allowing specific query parameters (like `login_hint`) to be passed from the redirect endpoint to the authorization grant URL. This enables clients to provide additional context to identity providers during authentication flows. # Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Quentin Gliech <quenting@element.io> --- changelog.d/18232.feature | 1 + .../configuration/config_documentation.md | 4 +++ synapse/config/oidc.py | 6 +++++ synapse/handlers/oidc.py | 12 ++++++++- tests/handlers/test_oidc.py | 26 +++++++++++++++++++ 5 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 changelog.d/18232.feature diff --git a/changelog.d/18232.feature b/changelog.d/18232.feature new file mode 100644 index 00000000000..ba5059ba80a --- /dev/null +++ b/changelog.d/18232.feature @@ -0,0 +1 @@ +Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index d2d282f2037..73fd9622cec 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3672,6 +3672,9 @@ Options for each entry include: * `additional_authorization_parameters`: String to string dictionary that will be passed as additional parameters to the authorization grant URL. +* `passthrough_authorization_parameters`: List of parameters that will be passed through from the redirect endpoint + to the authorization grant URL. + * `allow_existing_users`: set to true to allow a user logging in via OIDC to match a pre-existing account instead of failing. This could be used if switching from password logins to OIDC. Defaults to false. @@ -3798,6 +3801,7 @@ oidc_providers: jwks_uri: "https://accounts.example.com/.well-known/jwks.json" additional_authorization_parameters: acr_values: 2fa + passthrough_authorization_parameters: ["login_hint"] skip_verification: true enable_registration: true user_mapping_provider: diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py index 8ba0ba2c360..3ddf65a3e91 100644 --- a/synapse/config/oidc.py +++ b/synapse/config/oidc.py @@ -356,6 +356,9 @@ def _parse_oidc_config_dict( additional_authorization_parameters=oidc_config.get( "additional_authorization_parameters", {} ), + passthrough_authorization_parameters=oidc_config.get( + "passthrough_authorization_parameters", [] + ), ) @@ -501,3 +504,6 @@ class OidcProviderConfig: # Additional parameters that will be passed to the authorization grant URL additional_authorization_parameters: Mapping[str, str] + + # Allow query parameters to the redirect endpoint that will be passed to the authorization grant URL + passthrough_authorization_parameters: Collection[str] diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index 18efdd9f6ee..c4cf0636a3a 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -467,6 +467,10 @@ def __init__( self._sso_handler.register_identity_provider(self) + self.passthrough_authorization_parameters = ( + provider.passthrough_authorization_parameters + ) + def _validate_metadata(self, m: OpenIDProviderMetadata) -> None: """Verifies the provider metadata. @@ -1005,7 +1009,6 @@ async def handle_redirect_request( when everything is done (or None for UI Auth) ui_auth_session_id: The session ID of the ongoing UI Auth (or None if this is a login). - Returns: The redirect URL to the authorization endpoint. @@ -1078,6 +1081,13 @@ async def handle_redirect_request( ) ) + # add passthrough additional authorization parameters + passthrough_authorization_parameters = self.passthrough_authorization_parameters + for parameter in passthrough_authorization_parameters: + parameter_value = parse_string(request, parameter) + if parameter_value: + additional_authorization_parameters.update({parameter: parameter_value}) + authorization_endpoint = metadata.get("authorization_endpoint") return prepare_grant_uri( authorization_endpoint, diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index cfd9969563e..a7cead83d0d 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -484,6 +484,32 @@ def test_redirect_request(self) -> None: self.assertEqual(code_verifier, "") self.assertEqual(redirect, "http://client/redirect") + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "passthrough_authorization_parameters": ["additional_parameter"], + } + } + ) + def test_passthrough_parameters(self) -> None: + """The redirect request has additional parameters, one is authorized, one is not""" + req = Mock(spec=["cookies", "args"]) + req.cookies = [] + req.args = {} + req.args[b"additional_parameter"] = ["a_value".encode("utf-8")] + req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")] + + url = urlparse( + self.get_success( + self.provider.handle_redirect_request(req, b"http://client/redirect") + ) + ) + + params = parse_qs(url.query) + self.assertEqual(params["additional_parameter"], ["a_value"]) + self.assertNotIn("not_authorized_parameters", params) + @override_config({"oidc_config": DEFAULT_CONFIG}) def test_redirect_request_with_code_challenge(self) -> None: """The redirect request has the right arguments & generates a valid session cookie.""" From ae701e17090324ea5182450f42fbc9cfa9352835 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erikj@element.io> Date: Mon, 14 Apr 2025 17:54:47 +0100 Subject: [PATCH 04/11] Add caches to new hot path functions (#18337) We call these two functions for every authed request when using delegated auth. --- changelog.d/18337.misc | 1 + synapse/handlers/device.py | 2 ++ synapse/storage/databases/main/devices.py | 9 ++++++- .../storage/databases/main/registration.py | 26 +++++++++---------- 4 files changed, 23 insertions(+), 15 deletions(-) create mode 100644 changelog.d/18337.misc diff --git a/changelog.d/18337.misc b/changelog.d/18337.misc new file mode 100644 index 00000000000..b78276fe765 --- /dev/null +++ b/changelog.d/18337.misc @@ -0,0 +1 @@ +Add cache to storage functions used to auth requests when using delegated auth. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d9622080b4f..1efd039f227 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -163,6 +163,8 @@ async def get_device(self, user_id: str, device_id: str) -> JsonDict: raise errors.NotFoundError() ips = await self.store.get_last_client_ip_by_device(user_id, device_id) + + device = dict(device) _update_device_from_client_ips(device, ips) set_tag("device", str(device)) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 0b6d1f2b050..3f0b2f5d848 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -282,9 +282,10 @@ def count_devices_by_users_txn( "count_devices_by_users", count_devices_by_users_txn, user_ids ) + @cached() async def get_device( self, user_id: str, device_id: str - ) -> Optional[Dict[str, Any]]: + ) -> Optional[Mapping[str, Any]]: """Retrieve a device. Only returns devices that are not marked as hidden. @@ -1817,6 +1818,8 @@ async def store_device( }, desc="store_device", ) + await self.invalidate_cache_and_stream("get_device", (user_id, device_id)) + if not inserted: # if the device already exists, check if it's a real device, or # if the device ID is reserved by something else @@ -1882,6 +1885,9 @@ def _delete_devices_txn(txn: LoggingTransaction, device_ids: List[str]) -> None: values=device_ids, keyvalues={"user_id": user_id}, ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_device, [(user_id, device_id) for device_id in device_ids] + ) for batch in batch_iter(device_ids, 100): await self.db_pool.runInteraction( @@ -1915,6 +1921,7 @@ async def update_device( updatevalues=updates, desc="update_device", ) + await self.invalidate_cache_and_stream("get_device", (user_id, device_id)) async def update_remote_device_list_cache_entry( self, user_id: str, device_id: str, content: JsonDict, stream_id: str diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 8380930c70e..eadbf4901c4 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -759,6 +759,9 @@ def _record_user_external_id_txn( external_id: id on that system user_id: complete mxid that it is mapped to """ + self._invalidate_cache_and_stream( + txn, self.get_user_by_external_id, (auth_provider, external_id) + ) self.db_pool.simple_insert_txn( txn, @@ -789,6 +792,9 @@ async def remove_user_external_id( }, desc="remove_user_external_id", ) + await self.invalidate_cache_and_stream( + "get_user_by_external_id", (auth_provider, external_id) + ) async def replace_user_external_id( self, @@ -809,29 +815,20 @@ async def replace_user_external_id( ExternalIDReuseException if the new external_id could not be mapped. """ - def _remove_user_external_ids_txn( + def _replace_user_external_id_txn( txn: LoggingTransaction, - user_id: str, ) -> None: - """Remove all mappings from external user ids to a mxid - If these mappings are not found, this method does nothing. - - Args: - user_id: complete mxid that it is mapped to - """ - self.db_pool.simple_delete_txn( txn, table="user_external_ids", keyvalues={"user_id": user_id}, ) - def _replace_user_external_id_txn( - txn: LoggingTransaction, - ) -> None: - _remove_user_external_ids_txn(txn, user_id) - for auth_provider, external_id in record_external_ids: + self._invalidate_cache_and_stream( + txn, self.get_user_by_external_id, (auth_provider, external_id) + ) + self._record_user_external_id_txn( txn, auth_provider, @@ -847,6 +844,7 @@ def _replace_user_external_id_txn( except self.database_engine.module.IntegrityError: raise ExternalIDReuseException() + @cached() async def get_user_by_external_id( self, auth_provider: str, external_id: str ) -> Optional[str]: From a832375bfb5b0327e73d2b5cf9104b555308e827 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 15 Apr 2025 07:49:08 -0700 Subject: [PATCH 05/11] Add total event, unencrypted message, and e2ee event counts to stats reporting (#18260) Co-authored-by: Eric Eastwood <erice@element.io> --- changelog.d/18260.feature | 1 + .../reporting_homeserver_usage_statistics.md | 9 +- synapse/app/phone_stats_home.py | 41 ++- .../databases/main/events_bg_updates.py | 290 +++++++++++++++++- synapse/storage/databases/main/metrics.py | 38 +++ synapse/storage/schema/__init__.py | 8 +- .../schema/main/delta/92/01_event_stats.sql | 33 ++ synapse/types/storage/__init__.py | 2 + tests/metrics/test_phone_home_stats.py | 258 ++++++++++++++++ tests/storage/test_event_stats.py | 237 ++++++++++++++ 10 files changed, 907 insertions(+), 10 deletions(-) create mode 100644 changelog.d/18260.feature create mode 100644 synapse/storage/schema/main/delta/92/01_event_stats.sql create mode 100644 tests/metrics/test_phone_home_stats.py create mode 100644 tests/storage/test_event_stats.py diff --git a/changelog.d/18260.feature b/changelog.d/18260.feature new file mode 100644 index 00000000000..e44e3dc990a --- /dev/null +++ b/changelog.d/18260.feature @@ -0,0 +1 @@ +Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics. diff --git a/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md b/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md index 4c0dbb5acd0..cdec7984100 100644 --- a/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md +++ b/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md @@ -30,10 +30,13 @@ The following statistics are sent to the configured reporting endpoint: | `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. | | `total_users` | int | The number of registered users on the homeserver. | | `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. | -| `daily_user_type_native` | int | The number of native users created in the last 24 hours. | +| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. | | `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. | | `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. | | `total_room_count` | int | The total number of rooms present on the homeserver. | +| `total_event_count` | int | The total number of events present on the homeserver. | +| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. | +| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. | | `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. | | `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. | | `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. | @@ -50,8 +53,8 @@ The following statistics are sent to the configured reporting endpoint: | `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. | | `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. | | `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. | -| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. | -| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. | +| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. | +| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. | [^1]: Native matrix users and guests are always counted. If the diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index f602bbbeea7..fe55838416f 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -34,6 +34,22 @@ logger = logging.getLogger("synapse.app.homeserver") +ONE_MINUTE_SECONDS = 60 +ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS + +MILLISECONDS_PER_SECOND = 1000 + +INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS +""" +We wait 5 minutes to send the first set of stats as the server can be quite busy the +first few minutes +""" + +PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS +""" +Phone home stats are sent every 3 hours +""" + # Contains the list of processes we will be monitoring # currently either 0 or 1 _stats_process: List[Tuple[int, "resource.struct_rusage"]] = [] @@ -121,6 +137,9 @@ async def phone_stats_home( room_count = await store.get_room_count() stats["total_room_count"] = room_count + stats["total_event_count"] = await store.count_total_events() + stats["total_message_count"] = await store.count_total_messages() + stats["total_e2ee_event_count"] = await store.count_total_e2ee_events() stats["daily_active_users"] = common_metrics.daily_active_users stats["monthly_active_users"] = await store.count_monthly_users() @@ -185,12 +204,14 @@ def performance_stats_init() -> None: # If you increase the loop period, the accuracy of user_daily_visits # table will decrease clock.looping_call( - hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000 + hs.get_datastores().main.generate_user_daily_visits, + 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND, ) # monthly active user limiting functionality clock.looping_call( - hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60 + hs.get_datastores().main.reap_monthly_active_users, + ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND, ) hs.get_datastores().main.reap_monthly_active_users() @@ -216,12 +237,20 @@ async def generate_monthly_active_users() -> None: if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: generate_monthly_active_users() - clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + clock.looping_call( + generate_monthly_active_users, + 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND, + ) # End of monthly active user settings if hs.config.metrics.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats) + clock.looping_call( + phone_stats_home, + PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND, + hs, + stats, + ) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -229,4 +258,6 @@ async def generate_monthly_active_users() -> None: # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, phone_stats_home, hs, stats) + clock.call_later( + INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats + ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 4b0bdd79c67..b821d1c1b43 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -47,7 +47,7 @@ ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -311,6 +311,12 @@ def __init__( self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, ) + # Add a background update to add triggers which track event counts. + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + self._event_stats_populate_counts_bg_update, + ) + # We want this to run on the main database at startup before we start processing # events. # @@ -2547,6 +2553,288 @@ def _txn( return num_rows + async def _event_stats_populate_counts_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the `event_stats` table with initial + values, and register DB triggers to continue updating it. + + We first register TRIGGERs on rows being added/removed from the `events` table, + which will keep the event counts continuously updated. We also mark the stopping + point for the main population step so we don't double count events. + + Then we will iterate through the `events` table in batches and update event + counts until we reach the stopping point. + + This data is intended to be used by the phone-home stats to keep track + of total event and message counts. A trigger is preferred to counting + rows in the `events` table, as said table can grow quite large. + + It is also preferable to adding an index on the `events` table, as even + an index can grow large. And calculating total counts would require + querying that entire index. + """ + # The last event `stream_ordering` we processed (starting place of this next + # batch). + last_event_stream_ordering = progress.get( + "last_event_stream_ordering", -(1 << 31) + ) + # The event `stream_ordering` we should stop at. This is used to avoid double + # counting events that are already accounted for because of the triggers. + stop_event_stream_ordering: Optional[int] = progress.get( + "stop_event_stream_ordering", None + ) + + def _add_triggers_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + """ + Adds the triggers to the `events` table to keep the `event_stats` counts + up-to-date. + + Also populates the `stop_event_stream_ordering` background update progress + value. This marks the point at which we added the triggers, so we can avoid + double counting events that are already accounted for in the population + step. + + Returns: + The latest event `stream_ordering` in the `events` table when the triggers + were added or `None` if the `events` table is empty. + """ + + # Each time an event is inserted into the `events` table, update the stats. + # + # We're using `AFTER` triggers as we want to count successful inserts/deletes and + # not the ones that could potentially fail. + if isinstance(txn.database_engine, Sqlite3Engine): + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger + AFTER INSERT ON events + BEGIN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count + 1 + WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL; + + -- Increment e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count + 1 + WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL; + END; + """ + ) + + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger + AFTER DELETE ON events + BEGIN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count - 1 + WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL; + + -- Decrement e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count - 1 + WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL; + END; + """ + ) + elif isinstance(txn.database_engine, PostgresEngine): + txn.execute( + """ + CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$ + BEGIN + IF TG_OP = 'INSERT' THEN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1; + END IF; + + -- Increment e2ee_event_count for m.room.encrypted events + IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1; + END IF; + + -- We're not modifying the row being inserted/deleted, so we return it unchanged. + RETURN NEW; + + ELSIF TG_OP = 'DELETE' THEN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1; + END IF; + + -- Decrement e2ee_event_count for m.room.encrypted events + IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1; + END IF; + + -- "The usual idiom in DELETE triggers is to return OLD." + -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html) + RETURN OLD; + END IF; + + RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). ' + 'This indicates a trigger misconfiguration as this function should only' + 'run with INSERT/DELETE operations.', TG_OP; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres + # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html) + txn.execute( + """ + DO + $$BEGIN + CREATE TRIGGER event_stats_increment_counts_trigger + AFTER INSERT OR DELETE ON events + FOR EACH ROW + EXECUTE PROCEDURE event_stats_increment_counts(); + EXCEPTION + -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres + WHEN duplicate_object THEN + NULL; + END;$$; + """ + ) + else: + raise NotImplementedError("Unknown database engine") + + # Find the latest `stream_ordering` in the `events` table. We need to do + # this in the same transaction as where we add the triggers so we don't miss + # any events. + txn.execute( + """ + SELECT stream_ordering + FROM events + ORDER BY stream_ordering DESC + LIMIT 1 + """ + ) + row = cast(Optional[Tuple[int]], txn.fetchone()) + + # Update the progress + if row is not None: + (max_stream_ordering,) = row + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + {"stop_event_stream_ordering": max_stream_ordering}, + ) + return max_stream_ordering + + return None + + # First, add the triggers to keep the `event_stats` values up-to-date. + # + # If we don't have a `stop_event_stream_ordering` yet, we need to add the + # triggers to the `events` table and set the stopping point so we don't + # double count `events` later. + if stop_event_stream_ordering is None: + stop_event_stream_ordering = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update_add_triggers", + _add_triggers_txn, + ) + + # If there is no `stop_event_stream_ordering`, then there are no events + # in the `events` table and we can end the background update altogether. + if stop_event_stream_ordering is None: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + return batch_size + + def _populate_txn( + txn: LoggingTransaction, + ) -> int: + """ + Updates the `event_stats` table from this batch of events. + """ + + # Increment the counts based on the events present in this batch. + txn.execute( + """ + WITH event_batch AS ( + SELECT * + FROM events + WHERE stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering ASC + LIMIT ? + ), + batch_stats AS ( + SELECT + MAX(stream_ordering) AS max_stream_ordering, + COALESCE(COUNT(*), 0) AS total_event_count, + COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count, + COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count + FROM event_batch + + UNION ALL + + SELECT null, 0, 0, 0 + WHERE NOT EXISTS (SELECT 1 FROM event_batch) + LIMIT 1 + ) + UPDATE event_stats + SET + total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats), + unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats), + e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats) + RETURNING + (SELECT total_event_count FROM batch_stats) AS total_event_count, + (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering + """, + (last_event_stream_ordering, stop_event_stream_ordering, batch_size), + ) + + # Get the results of the update + (total_event_count, max_stream_ordering) = cast( + Tuple[int, Optional[int]], txn.fetchone() + ) + + # Update the progress + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + { + "last_event_stream_ordering": max_stream_ordering, + "stop_event_stream_ordering": stop_event_stream_ordering, + }, + ) + + return total_event_count + + num_rows_processed = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update", + _populate_txn, + ) + + # No more rows to process, so our background update is complete. + if not num_rows_processed: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + + return batch_size + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 9ce1100b5ce..a9cecc4bc1f 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -126,6 +126,44 @@ def _count_messages(txn: LoggingTransaction) -> int: return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages) + async def count_total_events(self) -> int: + """ + Returns the total number of events present on the server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="total_event_count", + desc="count_total_events", + ) + + async def count_total_messages(self) -> int: + """ + Returns the total number of `m.room.message` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="unencrypted_message_count", + desc="count_total_messages", + ) + + async def count_total_e2ee_events(self) -> int: + """ + Returns the total number of `m.room.encrypted` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="e2ee_event_count", + desc="count_total_e2ee_events", + ) + async def count_daily_sent_e2ee_messages(self) -> int: def _count_messages(txn: LoggingTransaction) -> int: # This is good enough as if you have silly characters in your own diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index ad683a3a07b..7474ba4542b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 91 # remember to update the list below when updating +SCHEMA_VERSION = 92 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 90 - Add a column `participant` to `room_memberships` table - Add background update to delete unreferenced state groups. + +Changes in SCHEMA_VERSION = 91 + - TODO + +Changes in SCHEMA_VERSION = 92 + - Add `event_stats` table to store global event statistics like total counts """ diff --git a/synapse/storage/schema/main/delta/92/01_event_stats.sql b/synapse/storage/schema/main/delta/92/01_event_stats.sql new file mode 100644 index 00000000000..4bded035784 --- /dev/null +++ b/synapse/storage/schema/main/delta/92/01_event_stats.sql @@ -0,0 +1,33 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + + +-- Create the `event_stats` table to store these statistics. +CREATE TABLE event_stats ( + total_event_count INTEGER NOT NULL DEFAULT 0, + unencrypted_message_count INTEGER NOT NULL DEFAULT 0, + e2ee_event_count INTEGER NOT NULL DEFAULT 0 +); + +-- Insert initial values into the table. +INSERT INTO event_stats ( + total_event_count, + unencrypted_message_count, + e2ee_event_count +) VALUES (0, 0, 0); + +-- Add a background update to populate the `event_stats` table with the current counts +-- from the `events` table and add triggers to keep this count up-to-date. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9201, 'event_stats_populate_counts_bg_update', '{}'); + diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index e03ff7ffc8c..73d19d91ed2 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -52,3 +52,5 @@ class _BackgroundUpdates: MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = ( "mark_unreferenced_state_groups_for_deletion_bg_update" ) + + EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update" diff --git a/tests/metrics/test_phone_home_stats.py b/tests/metrics/test_phone_home_stats.py new file mode 100644 index 00000000000..1b3eafed5fc --- /dev/null +++ b/tests/metrics/test_phone_home_stats.py @@ -0,0 +1,258 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. + +import logging +from unittest.mock import AsyncMock + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.app.phone_stats_home import ( + PHONE_HOME_INTERVAL_SECONDS, + start_phone_stats_home, +) +from synapse.rest import admin, login, register, room +from synapse.server import HomeServer +from synapse.types import JsonDict +from synapse.util import Clock + +from tests import unittest +from tests.server import ThreadedMemoryReactorClock + +TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats" +TEST_SERVER_CONTEXT = "test-server-context" + + +class PhoneHomeStatsTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + register.register_servlets, + login.register_servlets, + ] + + def make_homeserver( + self, reactor: ThreadedMemoryReactorClock, clock: Clock + ) -> HomeServer: + # Configure the homeserver to enable stats reporting. + config = self.default_config() + config["report_stats"] = True + config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT + + # Configure the server context so we can check it ends up being reported + config["server_context"] = TEST_SERVER_CONTEXT + + # Allow guests to be registered + config["allow_guest_access"] = True + + hs = self.setup_test_homeserver(config=config) + + # Replace the proxied http client with a mock, so we can inspect outbound requests to + # the configured stats endpoint. + self.put_json_mock = AsyncMock(return_value={}) + hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign] + return hs + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + # Wait for the background updates to add the database triggers that keep the + # `event_stats` table up-to-date. + self.wait_for_background_updates() + + # Force stats reporting to occur + start_phone_stats_home(hs=hs) + + super().prepare(reactor, clock, hs) + + def _get_latest_phone_home_stats(self) -> JsonDict: + # Wait for `phone_stats_home` to be called again + a healthy margin (50s). + self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50) + + # Extract the reported stats from our http client mock + mock_calls = self.put_json_mock.call_args_list + report_stats_calls = [] + for call in mock_calls: + if call.args[0] == TEST_REPORT_STATS_ENDPOINT: + report_stats_calls.append(call) + + self.assertGreaterEqual( + (len(report_stats_calls)), + 1, + "Expected at-least one call to the report_stats endpoint", + ) + + # Extract the phone home stats from the call + phone_home_stats = report_stats_calls[0].args[1] + + return phone_home_stats + + def _perform_user_actions(self) -> None: + """ + Perform some actions on the homeserver that would bump the phone home + stats. + """ + + # Create some users + user_1_mxid = self.register_user( + username="test_user_1", + password="test", + ) + user_2_mxid = self.register_user( + username="test_user_2", + password="test", + ) + # Note: `self.register_user` does not support guest registration, and updating the + # Admin API it calls to add a new parameter would cause the `mac` parameter to fail + # in a backwards-incompatible manner. Hence, we make a manual request here. + _guest_user_mxid = self.make_request( + method="POST", + path="/_matrix/client/v3/register?kind=guest", + content={ + "username": "guest_user", + "password": "test", + }, + shorthand=False, + ) + + # Log in to each user + user_1_token = self.login(username=user_1_mxid, password="test") + user_2_token = self.login(username=user_2_mxid, password="test") + + # Create a room between the two users + room_1_id = self.helper.create_room_as( + is_public=False, + tok=user_1_token, + ) + + # Mark this room as end-to-end encrypted + self.helper.send_state( + room_id=room_1_id, + event_type="m.room.encryption", + body={ + "algorithm": "m.megolm.v1.aes-sha2", + "rotation_period_ms": 604800000, + "rotation_period_msgs": 100, + }, + state_key="", + tok=user_1_token, + ) + + # User 1 invites user 2 + self.helper.invite( + room=room_1_id, + src=user_1_mxid, + targ=user_2_mxid, + tok=user_1_token, + ) + + # User 2 joins + self.helper.join( + room=room_1_id, + user=user_2_mxid, + tok=user_2_token, + ) + + # User 1 sends 10 unencrypted messages + for _ in range(10): + self.helper.send( + room_id=room_1_id, + body="Zoinks Scoob! A message!", + tok=user_1_token, + ) + + # User 2 sends 5 encrypted "messages" + for _ in range(5): + self.helper.send_event( + room_id=room_1_id, + type="m.room.encrypted", + content={ + "algorithm": "m.olm.v1.curve25519-aes-sha2", + "sender_key": "some_key", + "ciphertext": { + "some_key": { + "type": 0, + "body": "encrypted_payload", + }, + }, + }, + tok=user_2_token, + ) + + def test_phone_home_stats(self) -> None: + """ + Test that the phone home stats contain the stats we expect based on + the scenario carried out in `prepare` + """ + # Do things to bump the stats + self._perform_user_actions() + + # Wait for the stats to be reported + phone_home_stats = self._get_latest_phone_home_stats() + + self.assertEqual( + phone_home_stats["homeserver"], self.hs.config.server.server_name + ) + + self.assertTrue(isinstance(phone_home_stats["memory_rss"], int)) + self.assertTrue(isinstance(phone_home_stats["cpu_average"], int)) + + self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT) + + self.assertTrue(isinstance(phone_home_stats["timestamp"], int)) + self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int)) + self.assertTrue(isinstance(phone_home_stats["python_version"], str)) + + # We expect only our test users to exist on the homeserver + self.assertEqual(phone_home_stats["total_users"], 3) + self.assertEqual(phone_home_stats["total_nonbridged_users"], 3) + self.assertEqual(phone_home_stats["daily_user_type_native"], 2) + self.assertEqual(phone_home_stats["daily_user_type_guest"], 1) + self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0) + self.assertEqual(phone_home_stats["total_room_count"], 1) + self.assertEqual(phone_home_stats["total_event_count"], 24) + self.assertEqual(phone_home_stats["total_message_count"], 10) + self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5) + self.assertEqual(phone_home_stats["daily_active_users"], 2) + self.assertEqual(phone_home_stats["monthly_active_users"], 2) + self.assertEqual(phone_home_stats["daily_active_rooms"], 1) + self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1) + self.assertEqual(phone_home_stats["daily_messages"], 10) + self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5) + self.assertEqual(phone_home_stats["daily_sent_messages"], 10) + self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5) + + # Our users have not been around for >30 days, hence these are all 0. + self.assertEqual(phone_home_stats["r30v2_users_all"], 0) + self.assertEqual(phone_home_stats["r30v2_users_android"], 0) + self.assertEqual(phone_home_stats["r30v2_users_ios"], 0) + self.assertEqual(phone_home_stats["r30v2_users_electron"], 0) + self.assertEqual(phone_home_stats["r30v2_users_web"], 0) + self.assertEqual( + phone_home_stats["cache_factor"], self.hs.config.caches.global_factor + ) + self.assertEqual( + phone_home_stats["event_cache_size"], + self.hs.config.caches.event_cache_size, + ) + self.assertEqual( + phone_home_stats["database_engine"], + self.hs.config.database.databases[0].config["name"], + ) + self.assertEqual( + phone_home_stats["database_server_version"], + self.hs.get_datastores().main.database_engine.server_version, + ) + + synapse_logger = logging.getLogger("synapse") + log_level = synapse_logger.getEffectiveLevel() + self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level)) diff --git a/tests/storage/test_event_stats.py b/tests/storage/test_event_stats.py new file mode 100644 index 00000000000..791ed27018e --- /dev/null +++ b/tests/storage/test_event_stats.py @@ -0,0 +1,237 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. + + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.rest import admin, login, register, room +from synapse.server import HomeServer +from synapse.types.storage import _BackgroundUpdates +from synapse.util import Clock + +from tests import unittest + + +class EventStatsTestCase(unittest.HomeserverTestCase): + """ + Tests for the `event_stats` table + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + register.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + # Wait for the background updates to add the database triggers that keep the + # `event_stats` table up-to-date. + # + # This also prevents background updates running during the tests and messing + # with the results. + self.wait_for_background_updates() + + super().prepare(reactor, clock, hs) + + def _perform_user_actions(self) -> None: + """ + Perform some actions on the homeserver that would bump the event counts. + """ + # Create some users + user_1_mxid = self.register_user( + username="test_user_1", + password="test", + ) + user_2_mxid = self.register_user( + username="test_user_2", + password="test", + ) + # Note: `self.register_user` does not support guest registration, and updating the + # Admin API it calls to add a new parameter would cause the `mac` parameter to fail + # in a backwards-incompatible manner. Hence, we make a manual request here. + _guest_user_mxid = self.make_request( + method="POST", + path="/_matrix/client/v3/register?kind=guest", + content={ + "username": "guest_user", + "password": "test", + }, + shorthand=False, + ) + + # Log in to each user + user_1_token = self.login(username=user_1_mxid, password="test") + user_2_token = self.login(username=user_2_mxid, password="test") + + # Create a room between the two users + room_1_id = self.helper.create_room_as( + is_public=False, + tok=user_1_token, + ) + + # Mark this room as end-to-end encrypted + self.helper.send_state( + room_id=room_1_id, + event_type="m.room.encryption", + body={ + "algorithm": "m.megolm.v1.aes-sha2", + "rotation_period_ms": 604800000, + "rotation_period_msgs": 100, + }, + state_key="", + tok=user_1_token, + ) + + # User 1 invites user 2 + self.helper.invite( + room=room_1_id, + src=user_1_mxid, + targ=user_2_mxid, + tok=user_1_token, + ) + + # User 2 joins + self.helper.join( + room=room_1_id, + user=user_2_mxid, + tok=user_2_token, + ) + + # User 1 sends 10 unencrypted messages + for _ in range(10): + self.helper.send( + room_id=room_1_id, + body="Zoinks Scoob! A message!", + tok=user_1_token, + ) + + # User 2 sends 5 encrypted "messages" + for _ in range(5): + self.helper.send_event( + room_id=room_1_id, + type="m.room.encrypted", + content={ + "algorithm": "m.olm.v1.curve25519-aes-sha2", + "sender_key": "some_key", + "ciphertext": { + "some_key": { + "type": 0, + "body": "encrypted_payload", + }, + }, + }, + tok=user_2_token, + ) + + def test_background_update_with_events(self) -> None: + """ + Test that the background update to populate the `event_stats` table works + correctly when there are events in the database. + """ + # Do things to bump the stats + self._perform_user_actions() + + # Keep in mind: These are already populated as the background update has already + # ran once when Synapse started and added the database triggers which are + # incrementing things as new events come in. + self.assertEqual(self.get_success(self.store.count_total_events()), 24) + self.assertEqual(self.get_success(self.store.count_total_messages()), 10) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5) + + # Run the background update again + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # We expect these values to double as the background update is being run *again* + # and will double-count the `events`. + self.assertEqual(self.get_success(self.store.count_total_events()), 48) + self.assertEqual(self.get_success(self.store.count_total_messages()), 20) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10) + + def test_background_update_without_events(self) -> None: + """ + Test that the background update to populate the `event_stats` table works + correctly without events in the database. + """ + # Keep in mind: These are already populated as the background update has already + # ran once when Synapse started and added the database triggers which are + # incrementing things as new events come in. + # + # In this case, no events have been sent, so we expect the counts to be 0. + self.assertEqual(self.get_success(self.store.count_total_events()), 0) + self.assertEqual(self.get_success(self.store.count_total_messages()), 0) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0) + + # Run the background update again + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + self.assertEqual(self.get_success(self.store.count_total_events()), 0) + self.assertEqual(self.get_success(self.store.count_total_messages()), 0) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0) + + def test_background_update_resume_progress(self) -> None: + """ + Test that the background update to populate the `event_stats` table works + correctly to resume from `progress_json`. + """ + # Do things to bump the stats + self._perform_user_actions() + + # Keep in mind: These are already populated as the background update has already + # ran once when Synapse started and added the database triggers which are + # incrementing things as new events come in. + self.assertEqual(self.get_success(self.store.count_total_events()), 24) + self.assertEqual(self.get_success(self.store.count_total_messages()), 10) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5) + + # Run the background update again + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + "progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }', + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # We expect these values to increase as the background update is being run + # *again* and will double-count some of the `events` over the range specified + # by the `progress_json`. + self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7) + self.assertEqual(self.get_success(self.store.count_total_messages()), 16) + self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6) From 19b0e23c3d0af4a372194a6510281bd4ca3c1489 Mon Sep 17 00:00:00 2001 From: reivilibre <oliverw@element.io> Date: Tue, 15 Apr 2025 14:58:30 +0000 Subject: [PATCH 06/11] Fix the token introspection cache logging access tokens when MAS integration is in use. (#18335) The `ResponseCache` logs keys by default. Let's not do that for access tokens. --------- Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org> --- changelog.d/18335.bugfix | 1 + synapse/api/auth/msc3861_delegated.py | 2 ++ synapse/util/caches/response_cache.py | 33 +++++++++++++++++++-------- 3 files changed, 26 insertions(+), 10 deletions(-) create mode 100644 changelog.d/18335.bugfix diff --git a/changelog.d/18335.bugfix b/changelog.d/18335.bugfix new file mode 100644 index 00000000000..50df5a1b1d7 --- /dev/null +++ b/changelog.d/18335.bugfix @@ -0,0 +1 @@ +Fix the token introspection cache logging access tokens when MAS integration is in use. \ No newline at end of file diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 74e526123fd..cc2c79fa962 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -201,6 +201,8 @@ def __init__(self, hs: "HomeServer"): self._clock, "token_introspection", timeout_ms=120_000, + # don't log because the keys are access tokens + enable_logging=False, ) self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata]( diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 96b7ca83dcb..54b99134b9c 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]): used rather than trying to compute a new response. """ - def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): + def __init__( + self, + clock: Clock, + name: str, + timeout_ms: float = 0, + enable_logging: bool = True, + ): self._result_cache: Dict[KV, ResponseCacheEntry] = {} self.clock = clock @@ -109,6 +115,7 @@ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): self._name = name self._metrics = register_cache("response_cache", name, self, resizable=False) + self._enable_logging = enable_logging def size(self) -> int: return len(self._result_cache) @@ -246,9 +253,12 @@ async def handle_request(request): """ entry = self._get(key) if not entry: - logger.debug( - "[%s]: no cached result for [%s], calculating new one", self._name, key - ) + if self._enable_logging: + logger.debug( + "[%s]: no cached result for [%s], calculating new one", + self._name, + key, + ) context = ResponseCacheContext(cache_key=key) if cache_context: kwargs["cache_context"] = context @@ -269,12 +279,15 @@ async def cb() -> RV: return await make_deferred_yieldable(entry.result.observe()) result = entry.result.observe() - if result.called: - logger.info("[%s]: using completed cached result for [%s]", self._name, key) - else: - logger.info( - "[%s]: using incomplete cached result for [%s]", self._name, key - ) + if self._enable_logging: + if result.called: + logger.info( + "[%s]: using completed cached result for [%s]", self._name, key + ) + else: + logger.info( + "[%s]: using incomplete cached result for [%s]", self._name, key + ) span_context = entry.opentracing_span_context with start_active_span_follows_from( From 45420b1d42416a3461f08aa3e6752c160c93366a Mon Sep 17 00:00:00 2001 From: Erik Johnston <erikj@element.io> Date: Tue, 15 Apr 2025 16:02:27 +0100 Subject: [PATCH 07/11] Fix `force_tracing_for_users` config when using MAS (#18334) This is a copy of what we do for internal auth, and we should figure out a way to deduplicate some of this stuff: https://github.com/element-hq/synapse/blob/dd05cc55eedbf086ae224a13c9ae9f0332d96b1f/synapse/api/auth/internal.py#L62-L110 --- changelog.d/18334.bugfix | 1 + synapse/api/auth/msc3861_delegated.py | 51 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 changelog.d/18334.bugfix diff --git a/changelog.d/18334.bugfix b/changelog.d/18334.bugfix new file mode 100644 index 00000000000..d82e522cb89 --- /dev/null +++ b/changelog.d/18334.bugfix @@ -0,0 +1 @@ +Fix `force_tracing_for_users` config when using delegated auth. diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index cc2c79fa962..0598286cf41 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -45,6 +45,7 @@ ) from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable +from synapse.logging.opentracing import active_span, force_tracing, start_active_span from synapse.types import Requester, UserID, create_requester from synapse.util import json_decoder from synapse.util.caches.cached_call import RetryOnExceptionCachedCall @@ -177,6 +178,7 @@ def __init__(self, hs: "HomeServer"): self._http_client = hs.get_proxied_http_client() self._hostname = hs.hostname self._admin_token: Callable[[], Optional[str]] = self._config.admin_token + self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users # # Token Introspection Cache # This remembers what users/devices are represented by which access tokens, @@ -363,6 +365,55 @@ async def get_user_by_req( allow_guest: bool = False, allow_expired: bool = False, allow_locked: bool = False, + ) -> Requester: + """Get a registered user's ID. + + Args: + request: An HTTP request with an access_token query parameter. + allow_guest: If False, will raise an AuthError if the user making the + request is a guest. + allow_expired: If True, allow the request through even if the account + is expired, or session token lifetime has ended. Note that + /login will deliver access tokens regardless of expiration. + + Returns: + Resolves to the requester + Raises: + InvalidClientCredentialsError if no user by that token exists or the token + is invalid. + AuthError if access is denied for the user in the access token + """ + parent_span = active_span() + with start_active_span("get_user_by_req"): + requester = await self._wrapped_get_user_by_req( + request, allow_guest, allow_expired, allow_locked + ) + + if parent_span: + if requester.authenticated_entity in self._force_tracing_for_users: + # request tracing is enabled for this user, so we need to force it + # tracing on for the parent span (which will be the servlet span). + # + # It's too late for the get_user_by_req span to inherit the setting, + # so we also force it on for that. + force_tracing() + force_tracing(parent_span) + parent_span.set_tag( + "authenticated_entity", requester.authenticated_entity + ) + parent_span.set_tag("user_id", requester.user.to_string()) + if requester.device_id is not None: + parent_span.set_tag("device_id", requester.device_id) + if requester.app_service is not None: + parent_span.set_tag("appservice_id", requester.app_service.id) + return requester + + async def _wrapped_get_user_by_req( + self, + request: SynapseRequest, + allow_guest: bool = False, + allow_expired: bool = False, + allow_locked: bool = False, ) -> Requester: access_token = self.get_access_token_from_request(request) From 2c7a61e311002ebec0e3f5aff054f46dfb0015c5 Mon Sep 17 00:00:00 2001 From: Quentin Gliech <quenting@element.io> Date: Tue, 15 Apr 2025 17:30:45 +0200 Subject: [PATCH 08/11] Don't cache introspection failures (#18339) --- changelog.d/18339.bugfix | 1 + synapse/api/auth/msc3861_delegated.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 changelog.d/18339.bugfix diff --git a/changelog.d/18339.bugfix b/changelog.d/18339.bugfix new file mode 100644 index 00000000000..09d6d734200 --- /dev/null +++ b/changelog.d/18339.bugfix @@ -0,0 +1 @@ +Stop caching introspection failures when delegating auth to MAS. diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 0598286cf41..9ded3366e3f 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -49,7 +49,7 @@ from synapse.types import Requester, UserID, create_requester from synapse.util import json_decoder from synapse.util.caches.cached_call import RetryOnExceptionCachedCall -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext if TYPE_CHECKING: from synapse.rest.admin.experimental_features import ExperimentalFeature @@ -279,7 +279,9 @@ async def _introspection_endpoint(self) -> str: metadata = await self._issuer_metadata.get() return metadata.get("introspection_endpoint") - async def _introspect_token(self, token: str) -> IntrospectionResult: + async def _introspect_token( + self, token: str, cache_context: ResponseCacheContext[str] + ) -> IntrospectionResult: """ Send a token to the introspection endpoint and returns the introspection response @@ -295,6 +297,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult: Returns: The introspection response """ + # By default, we shouldn't cache the result unless we know it's valid + cache_context.should_cache = False introspection_endpoint = await self._introspection_endpoint() raw_headers: Dict[str, str] = { "Content-Type": "application/x-www-form-urlencoded", @@ -352,6 +356,8 @@ async def _introspect_token(self, token: str) -> IntrospectionResult: "The introspection endpoint returned an invalid JSON response." ) + # We had a valid response, so we can cache it + cache_context.should_cache = True return IntrospectionResult( IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec() ) @@ -482,7 +488,7 @@ async def get_user_by_access_token( try: introspection_result = await self._introspection_cache.wrap( - token, self._introspect_token, token + token, self._introspect_token, token, cache_context=True ) except Exception: logger.exception("Failed to introspect token") From 0046d7278bd8e350dcef40b95a05e116e6e66d90 Mon Sep 17 00:00:00 2001 From: Quentin Gliech <quenting@element.io> Date: Wed, 16 Apr 2025 09:34:58 +0200 Subject: [PATCH 09/11] Fix ExternalIDReuse exception for concurrent transactions (#18342) --- changelog.d/18342.bugfix | 1 + .../storage/databases/main/registration.py | 29 +++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 changelog.d/18342.bugfix diff --git a/changelog.d/18342.bugfix b/changelog.d/18342.bugfix new file mode 100644 index 00000000000..6fa2fa679a9 --- /dev/null +++ b/changelog.d/18342.bugfix @@ -0,0 +1 @@ +Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index eadbf4901c4..c43f31353b6 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -763,16 +763,33 @@ def _record_user_external_id_txn( txn, self.get_user_by_external_id, (auth_provider, external_id) ) - self.db_pool.simple_insert_txn( + # This INSERT ... ON CONFLICT DO NOTHING statement will cause a + # 'could not serialize access due to concurrent update' + # if the row is added concurrently by another transaction. + # This is exactly what we want, as it makes the transaction get retried + # in a new snapshot where we can check for a genuine conflict. + was_inserted = self.db_pool.simple_upsert_txn( txn, table="user_external_ids", - values={ - "auth_provider": auth_provider, - "external_id": external_id, - "user_id": user_id, - }, + keyvalues={"auth_provider": auth_provider, "external_id": external_id}, + values={}, + insertion_values={"user_id": user_id}, ) + if not was_inserted: + existing_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="user_external_ids", + keyvalues={"auth_provider": auth_provider, "user_id": user_id}, + retcol="external_id", + allow_none=True, + ) + + if existing_id != external_id: + raise ExternalIDReuseException( + f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}" + ) + async def remove_user_external_id( self, auth_provider: str, external_id: str, user_id: str ) -> None: From c16a981f22dd559b56caa94a46392c206be9a265 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erikj@element.io> Date: Wed, 16 Apr 2025 14:14:56 +0100 Subject: [PATCH 10/11] Fix query for room participation (#18345) Follow on from #18068 Currently the subquery in `UPDATE` is pointless, as it will still just update all `room_membership` rows. Instead, we should look at the current membership event ID (which is easily retrieved from `local_current_membership`). We also add a `AND NOT participant` to noop the `UPDATE` when the `participant` flag is already set. cc @H-Shay --- changelog.d/18345.bugfix | 1 + synapse/storage/databases/main/roommember.py | 20 ++++++++------------ 2 files changed, 9 insertions(+), 12 deletions(-) create mode 100644 changelog.d/18345.bugfix diff --git a/changelog.d/18345.bugfix b/changelog.d/18345.bugfix new file mode 100644 index 00000000000..c8a001d4a3f --- /dev/null +++ b/changelog.d/18345.bugfix @@ -0,0 +1 @@ +Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a0a6dcd04e7..dfa7dd48d9a 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1622,14 +1622,11 @@ def _set_room_participation_txn( sql = """ UPDATE room_memberships SET participant = true - WHERE (user_id, room_id) IN ( - SELECT user_id, room_id - FROM room_memberships - WHERE user_id = ? - AND room_id = ? - ORDER BY event_stream_ordering DESC - LIMIT 1 + WHERE event_id IN ( + SELECT event_id FROM local_current_membership + WHERE user_id = ? AND room_id = ? ) + AND NOT participant """ txn.execute(sql, (user_id, room_id)) @@ -1651,11 +1648,10 @@ def _get_room_participation_txn( ) -> bool: sql = """ SELECT participant - FROM room_memberships - WHERE user_id = ? - AND room_id = ? - ORDER BY event_stream_ordering DESC - LIMIT 1 + FROM local_current_membership AS l + INNER JOIN room_memberships AS r USING (event_id) + WHERE l.user_id = ? + AND l.room_id = ? """ txn.execute(sql, (user_id, room_id)) res = txn.fetchone() From 89cb613a4ef321d2eb52f13b94d1f1fc3205bad1 Mon Sep 17 00:00:00 2001 From: Devon Hudson <devon.dmytro@gmail.com> Date: Wed, 16 Apr 2025 16:41:41 +0000 Subject: [PATCH 11/11] Revert "Add total event, unencrypted message, and e2ee event counts to stats reporting" (#18346) Reverts element-hq/synapse#18260 It is causing a failure when building release debs for `debian:bullseye` with the following error: ``` sqlite3.OperationalError: near "RETURNING": syntax error ``` --- changelog.d/18260.feature | 1 - .../reporting_homeserver_usage_statistics.md | 9 +- synapse/app/phone_stats_home.py | 41 +-- .../databases/main/events_bg_updates.py | 290 +----------------- synapse/storage/databases/main/metrics.py | 38 --- synapse/storage/schema/__init__.py | 8 +- .../schema/main/delta/92/01_event_stats.sql | 33 -- synapse/types/storage/__init__.py | 2 - tests/metrics/test_phone_home_stats.py | 258 ---------------- tests/storage/test_event_stats.py | 237 -------------- 10 files changed, 10 insertions(+), 907 deletions(-) delete mode 100644 changelog.d/18260.feature delete mode 100644 synapse/storage/schema/main/delta/92/01_event_stats.sql delete mode 100644 tests/metrics/test_phone_home_stats.py delete mode 100644 tests/storage/test_event_stats.py diff --git a/changelog.d/18260.feature b/changelog.d/18260.feature deleted file mode 100644 index e44e3dc990a..00000000000 --- a/changelog.d/18260.feature +++ /dev/null @@ -1 +0,0 @@ -Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics. diff --git a/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md b/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md index cdec7984100..4c0dbb5acd0 100644 --- a/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md +++ b/docs/usage/administration/monitoring/reporting_homeserver_usage_statistics.md @@ -30,13 +30,10 @@ The following statistics are sent to the configured reporting endpoint: | `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. | | `total_users` | int | The number of registered users on the homeserver. | | `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. | -| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. | +| `daily_user_type_native` | int | The number of native users created in the last 24 hours. | | `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. | | `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. | | `total_room_count` | int | The total number of rooms present on the homeserver. | -| `total_event_count` | int | The total number of events present on the homeserver. | -| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. | -| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. | | `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. | | `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. | | `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. | @@ -53,8 +50,8 @@ The following statistics are sent to the configured reporting endpoint: | `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. | | `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. | | `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. | -| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. | -| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. | +| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. | +| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. | [^1]: Native matrix users and guests are always counted. If the diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index fe55838416f..f602bbbeea7 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -34,22 +34,6 @@ logger = logging.getLogger("synapse.app.homeserver") -ONE_MINUTE_SECONDS = 60 -ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS - -MILLISECONDS_PER_SECOND = 1000 - -INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS -""" -We wait 5 minutes to send the first set of stats as the server can be quite busy the -first few minutes -""" - -PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS -""" -Phone home stats are sent every 3 hours -""" - # Contains the list of processes we will be monitoring # currently either 0 or 1 _stats_process: List[Tuple[int, "resource.struct_rusage"]] = [] @@ -137,9 +121,6 @@ async def phone_stats_home( room_count = await store.get_room_count() stats["total_room_count"] = room_count - stats["total_event_count"] = await store.count_total_events() - stats["total_message_count"] = await store.count_total_messages() - stats["total_e2ee_event_count"] = await store.count_total_e2ee_events() stats["daily_active_users"] = common_metrics.daily_active_users stats["monthly_active_users"] = await store.count_monthly_users() @@ -204,14 +185,12 @@ def performance_stats_init() -> None: # If you increase the loop period, the accuracy of user_daily_visits # table will decrease clock.looping_call( - hs.get_datastores().main.generate_user_daily_visits, - 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND, + hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000 ) # monthly active user limiting functionality clock.looping_call( - hs.get_datastores().main.reap_monthly_active_users, - ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND, + hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60 ) hs.get_datastores().main.reap_monthly_active_users() @@ -237,20 +216,12 @@ async def generate_monthly_active_users() -> None: if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: generate_monthly_active_users() - clock.looping_call( - generate_monthly_active_users, - 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND, - ) + clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings if hs.config.metrics.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call( - phone_stats_home, - PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND, - hs, - stats, - ) + clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -258,6 +229,4 @@ async def generate_monthly_active_users() -> None: # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later( - INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats - ) + clock.call_later(5 * 60, phone_stats_home, hs, stats) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b821d1c1b43..4b0bdd79c67 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -47,7 +47,7 @@ ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -311,12 +311,6 @@ def __init__( self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, ) - # Add a background update to add triggers which track event counts. - self.db_pool.updates.register_background_update_handler( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - self._event_stats_populate_counts_bg_update, - ) - # We want this to run on the main database at startup before we start processing # events. # @@ -2553,288 +2547,6 @@ def _txn( return num_rows - async def _event_stats_populate_counts_bg_update( - self, progress: JsonDict, batch_size: int - ) -> int: - """ - Background update to populate the `event_stats` table with initial - values, and register DB triggers to continue updating it. - - We first register TRIGGERs on rows being added/removed from the `events` table, - which will keep the event counts continuously updated. We also mark the stopping - point for the main population step so we don't double count events. - - Then we will iterate through the `events` table in batches and update event - counts until we reach the stopping point. - - This data is intended to be used by the phone-home stats to keep track - of total event and message counts. A trigger is preferred to counting - rows in the `events` table, as said table can grow quite large. - - It is also preferable to adding an index on the `events` table, as even - an index can grow large. And calculating total counts would require - querying that entire index. - """ - # The last event `stream_ordering` we processed (starting place of this next - # batch). - last_event_stream_ordering = progress.get( - "last_event_stream_ordering", -(1 << 31) - ) - # The event `stream_ordering` we should stop at. This is used to avoid double - # counting events that are already accounted for because of the triggers. - stop_event_stream_ordering: Optional[int] = progress.get( - "stop_event_stream_ordering", None - ) - - def _add_triggers_txn( - txn: LoggingTransaction, - ) -> Optional[int]: - """ - Adds the triggers to the `events` table to keep the `event_stats` counts - up-to-date. - - Also populates the `stop_event_stream_ordering` background update progress - value. This marks the point at which we added the triggers, so we can avoid - double counting events that are already accounted for in the population - step. - - Returns: - The latest event `stream_ordering` in the `events` table when the triggers - were added or `None` if the `events` table is empty. - """ - - # Each time an event is inserted into the `events` table, update the stats. - # - # We're using `AFTER` triggers as we want to count successful inserts/deletes and - # not the ones that could potentially fail. - if isinstance(txn.database_engine, Sqlite3Engine): - txn.execute( - """ - CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger - AFTER INSERT ON events - BEGIN - -- Always increment total_event_count - UPDATE event_stats SET total_event_count = total_event_count + 1; - - -- Increment unencrypted_message_count for m.room.message events - UPDATE event_stats - SET unencrypted_message_count = unencrypted_message_count + 1 - WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL; - - -- Increment e2ee_event_count for m.room.encrypted events - UPDATE event_stats - SET e2ee_event_count = e2ee_event_count + 1 - WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL; - END; - """ - ) - - txn.execute( - """ - CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger - AFTER DELETE ON events - BEGIN - -- Always decrement total_event_count - UPDATE event_stats SET total_event_count = total_event_count - 1; - - -- Decrement unencrypted_message_count for m.room.message events - UPDATE event_stats - SET unencrypted_message_count = unencrypted_message_count - 1 - WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL; - - -- Decrement e2ee_event_count for m.room.encrypted events - UPDATE event_stats - SET e2ee_event_count = e2ee_event_count - 1 - WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL; - END; - """ - ) - elif isinstance(txn.database_engine, PostgresEngine): - txn.execute( - """ - CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$ - BEGIN - IF TG_OP = 'INSERT' THEN - -- Always increment total_event_count - UPDATE event_stats SET total_event_count = total_event_count + 1; - - -- Increment unencrypted_message_count for m.room.message events - IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN - UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1; - END IF; - - -- Increment e2ee_event_count for m.room.encrypted events - IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN - UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1; - END IF; - - -- We're not modifying the row being inserted/deleted, so we return it unchanged. - RETURN NEW; - - ELSIF TG_OP = 'DELETE' THEN - -- Always decrement total_event_count - UPDATE event_stats SET total_event_count = total_event_count - 1; - - -- Decrement unencrypted_message_count for m.room.message events - IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN - UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1; - END IF; - - -- Decrement e2ee_event_count for m.room.encrypted events - IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN - UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1; - END IF; - - -- "The usual idiom in DELETE triggers is to return OLD." - -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html) - RETURN OLD; - END IF; - - RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). ' - 'This indicates a trigger misconfiguration as this function should only' - 'run with INSERT/DELETE operations.', TG_OP; - END; - $BODY$ LANGUAGE plpgsql; - """ - ) - - # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres - # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html) - txn.execute( - """ - DO - $$BEGIN - CREATE TRIGGER event_stats_increment_counts_trigger - AFTER INSERT OR DELETE ON events - FOR EACH ROW - EXECUTE PROCEDURE event_stats_increment_counts(); - EXCEPTION - -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres - WHEN duplicate_object THEN - NULL; - END;$$; - """ - ) - else: - raise NotImplementedError("Unknown database engine") - - # Find the latest `stream_ordering` in the `events` table. We need to do - # this in the same transaction as where we add the triggers so we don't miss - # any events. - txn.execute( - """ - SELECT stream_ordering - FROM events - ORDER BY stream_ordering DESC - LIMIT 1 - """ - ) - row = cast(Optional[Tuple[int]], txn.fetchone()) - - # Update the progress - if row is not None: - (max_stream_ordering,) = row - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - {"stop_event_stream_ordering": max_stream_ordering}, - ) - return max_stream_ordering - - return None - - # First, add the triggers to keep the `event_stats` values up-to-date. - # - # If we don't have a `stop_event_stream_ordering` yet, we need to add the - # triggers to the `events` table and set the stopping point so we don't - # double count `events` later. - if stop_event_stream_ordering is None: - stop_event_stream_ordering = await self.db_pool.runInteraction( - "_event_stats_populate_counts_bg_update_add_triggers", - _add_triggers_txn, - ) - - # If there is no `stop_event_stream_ordering`, then there are no events - # in the `events` table and we can end the background update altogether. - if stop_event_stream_ordering is None: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE - ) - return batch_size - - def _populate_txn( - txn: LoggingTransaction, - ) -> int: - """ - Updates the `event_stats` table from this batch of events. - """ - - # Increment the counts based on the events present in this batch. - txn.execute( - """ - WITH event_batch AS ( - SELECT * - FROM events - WHERE stream_ordering > ? AND stream_ordering <= ? - ORDER BY stream_ordering ASC - LIMIT ? - ), - batch_stats AS ( - SELECT - MAX(stream_ordering) AS max_stream_ordering, - COALESCE(COUNT(*), 0) AS total_event_count, - COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count, - COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count - FROM event_batch - - UNION ALL - - SELECT null, 0, 0, 0 - WHERE NOT EXISTS (SELECT 1 FROM event_batch) - LIMIT 1 - ) - UPDATE event_stats - SET - total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats), - unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats), - e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats) - RETURNING - (SELECT total_event_count FROM batch_stats) AS total_event_count, - (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering - """, - (last_event_stream_ordering, stop_event_stream_ordering, batch_size), - ) - - # Get the results of the update - (total_event_count, max_stream_ordering) = cast( - Tuple[int, Optional[int]], txn.fetchone() - ) - - # Update the progress - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - { - "last_event_stream_ordering": max_stream_ordering, - "stop_event_stream_ordering": stop_event_stream_ordering, - }, - ) - - return total_event_count - - num_rows_processed = await self.db_pool.runInteraction( - "_event_stats_populate_counts_bg_update", - _populate_txn, - ) - - # No more rows to process, so our background update is complete. - if not num_rows_processed: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE - ) - - return batch_size - def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index a9cecc4bc1f..9ce1100b5ce 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -126,44 +126,6 @@ def _count_messages(txn: LoggingTransaction) -> int: return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages) - async def count_total_events(self) -> int: - """ - Returns the total number of events present on the server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="total_event_count", - desc="count_total_events", - ) - - async def count_total_messages(self) -> int: - """ - Returns the total number of `m.room.message` events present on the - server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="unencrypted_message_count", - desc="count_total_messages", - ) - - async def count_total_e2ee_events(self) -> int: - """ - Returns the total number of `m.room.encrypted` events present on the - server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="e2ee_event_count", - desc="count_total_e2ee_events", - ) - async def count_daily_sent_e2ee_messages(self) -> int: def _count_messages(txn: LoggingTransaction) -> int: # This is good enough as if you have silly characters in your own diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7474ba4542b..ad683a3a07b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 92 # remember to update the list below when updating +SCHEMA_VERSION = 91 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -162,12 +162,6 @@ Changes in SCHEMA_VERSION = 90 - Add a column `participant` to `room_memberships` table - Add background update to delete unreferenced state groups. - -Changes in SCHEMA_VERSION = 91 - - TODO - -Changes in SCHEMA_VERSION = 92 - - Add `event_stats` table to store global event statistics like total counts """ diff --git a/synapse/storage/schema/main/delta/92/01_event_stats.sql b/synapse/storage/schema/main/delta/92/01_event_stats.sql deleted file mode 100644 index 4bded035784..00000000000 --- a/synapse/storage/schema/main/delta/92/01_event_stats.sql +++ /dev/null @@ -1,33 +0,0 @@ --- --- This file is licensed under the Affero General Public License (AGPL) version 3. --- --- Copyright (C) 2025 New Vector, Ltd --- --- This program is free software: you can redistribute it and/or modify --- it under the terms of the GNU Affero General Public License as --- published by the Free Software Foundation, either version 3 of the --- License, or (at your option) any later version. --- --- See the GNU Affero General Public License for more details: --- <https://www.gnu.org/licenses/agpl-3.0.html>. - - --- Create the `event_stats` table to store these statistics. -CREATE TABLE event_stats ( - total_event_count INTEGER NOT NULL DEFAULT 0, - unencrypted_message_count INTEGER NOT NULL DEFAULT 0, - e2ee_event_count INTEGER NOT NULL DEFAULT 0 -); - --- Insert initial values into the table. -INSERT INTO event_stats ( - total_event_count, - unencrypted_message_count, - e2ee_event_count -) VALUES (0, 0, 0); - --- Add a background update to populate the `event_stats` table with the current counts --- from the `events` table and add triggers to keep this count up-to-date. -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (9201, 'event_stats_populate_counts_bg_update', '{}'); - diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index 73d19d91ed2..e03ff7ffc8c 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -52,5 +52,3 @@ class _BackgroundUpdates: MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = ( "mark_unreferenced_state_groups_for_deletion_bg_update" ) - - EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update" diff --git a/tests/metrics/test_phone_home_stats.py b/tests/metrics/test_phone_home_stats.py deleted file mode 100644 index 1b3eafed5fc..00000000000 --- a/tests/metrics/test_phone_home_stats.py +++ /dev/null @@ -1,258 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright (C) 2025 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. - -import logging -from unittest.mock import AsyncMock - -from twisted.test.proto_helpers import MemoryReactor - -from synapse.app.phone_stats_home import ( - PHONE_HOME_INTERVAL_SECONDS, - start_phone_stats_home, -) -from synapse.rest import admin, login, register, room -from synapse.server import HomeServer -from synapse.types import JsonDict -from synapse.util import Clock - -from tests import unittest -from tests.server import ThreadedMemoryReactorClock - -TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats" -TEST_SERVER_CONTEXT = "test-server-context" - - -class PhoneHomeStatsTestCase(unittest.HomeserverTestCase): - servlets = [ - admin.register_servlets_for_client_rest_resource, - room.register_servlets, - register.register_servlets, - login.register_servlets, - ] - - def make_homeserver( - self, reactor: ThreadedMemoryReactorClock, clock: Clock - ) -> HomeServer: - # Configure the homeserver to enable stats reporting. - config = self.default_config() - config["report_stats"] = True - config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT - - # Configure the server context so we can check it ends up being reported - config["server_context"] = TEST_SERVER_CONTEXT - - # Allow guests to be registered - config["allow_guest_access"] = True - - hs = self.setup_test_homeserver(config=config) - - # Replace the proxied http client with a mock, so we can inspect outbound requests to - # the configured stats endpoint. - self.put_json_mock = AsyncMock(return_value={}) - hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign] - return hs - - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.store = hs.get_datastores().main - - # Wait for the background updates to add the database triggers that keep the - # `event_stats` table up-to-date. - self.wait_for_background_updates() - - # Force stats reporting to occur - start_phone_stats_home(hs=hs) - - super().prepare(reactor, clock, hs) - - def _get_latest_phone_home_stats(self) -> JsonDict: - # Wait for `phone_stats_home` to be called again + a healthy margin (50s). - self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50) - - # Extract the reported stats from our http client mock - mock_calls = self.put_json_mock.call_args_list - report_stats_calls = [] - for call in mock_calls: - if call.args[0] == TEST_REPORT_STATS_ENDPOINT: - report_stats_calls.append(call) - - self.assertGreaterEqual( - (len(report_stats_calls)), - 1, - "Expected at-least one call to the report_stats endpoint", - ) - - # Extract the phone home stats from the call - phone_home_stats = report_stats_calls[0].args[1] - - return phone_home_stats - - def _perform_user_actions(self) -> None: - """ - Perform some actions on the homeserver that would bump the phone home - stats. - """ - - # Create some users - user_1_mxid = self.register_user( - username="test_user_1", - password="test", - ) - user_2_mxid = self.register_user( - username="test_user_2", - password="test", - ) - # Note: `self.register_user` does not support guest registration, and updating the - # Admin API it calls to add a new parameter would cause the `mac` parameter to fail - # in a backwards-incompatible manner. Hence, we make a manual request here. - _guest_user_mxid = self.make_request( - method="POST", - path="/_matrix/client/v3/register?kind=guest", - content={ - "username": "guest_user", - "password": "test", - }, - shorthand=False, - ) - - # Log in to each user - user_1_token = self.login(username=user_1_mxid, password="test") - user_2_token = self.login(username=user_2_mxid, password="test") - - # Create a room between the two users - room_1_id = self.helper.create_room_as( - is_public=False, - tok=user_1_token, - ) - - # Mark this room as end-to-end encrypted - self.helper.send_state( - room_id=room_1_id, - event_type="m.room.encryption", - body={ - "algorithm": "m.megolm.v1.aes-sha2", - "rotation_period_ms": 604800000, - "rotation_period_msgs": 100, - }, - state_key="", - tok=user_1_token, - ) - - # User 1 invites user 2 - self.helper.invite( - room=room_1_id, - src=user_1_mxid, - targ=user_2_mxid, - tok=user_1_token, - ) - - # User 2 joins - self.helper.join( - room=room_1_id, - user=user_2_mxid, - tok=user_2_token, - ) - - # User 1 sends 10 unencrypted messages - for _ in range(10): - self.helper.send( - room_id=room_1_id, - body="Zoinks Scoob! A message!", - tok=user_1_token, - ) - - # User 2 sends 5 encrypted "messages" - for _ in range(5): - self.helper.send_event( - room_id=room_1_id, - type="m.room.encrypted", - content={ - "algorithm": "m.olm.v1.curve25519-aes-sha2", - "sender_key": "some_key", - "ciphertext": { - "some_key": { - "type": 0, - "body": "encrypted_payload", - }, - }, - }, - tok=user_2_token, - ) - - def test_phone_home_stats(self) -> None: - """ - Test that the phone home stats contain the stats we expect based on - the scenario carried out in `prepare` - """ - # Do things to bump the stats - self._perform_user_actions() - - # Wait for the stats to be reported - phone_home_stats = self._get_latest_phone_home_stats() - - self.assertEqual( - phone_home_stats["homeserver"], self.hs.config.server.server_name - ) - - self.assertTrue(isinstance(phone_home_stats["memory_rss"], int)) - self.assertTrue(isinstance(phone_home_stats["cpu_average"], int)) - - self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT) - - self.assertTrue(isinstance(phone_home_stats["timestamp"], int)) - self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int)) - self.assertTrue(isinstance(phone_home_stats["python_version"], str)) - - # We expect only our test users to exist on the homeserver - self.assertEqual(phone_home_stats["total_users"], 3) - self.assertEqual(phone_home_stats["total_nonbridged_users"], 3) - self.assertEqual(phone_home_stats["daily_user_type_native"], 2) - self.assertEqual(phone_home_stats["daily_user_type_guest"], 1) - self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0) - self.assertEqual(phone_home_stats["total_room_count"], 1) - self.assertEqual(phone_home_stats["total_event_count"], 24) - self.assertEqual(phone_home_stats["total_message_count"], 10) - self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5) - self.assertEqual(phone_home_stats["daily_active_users"], 2) - self.assertEqual(phone_home_stats["monthly_active_users"], 2) - self.assertEqual(phone_home_stats["daily_active_rooms"], 1) - self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1) - self.assertEqual(phone_home_stats["daily_messages"], 10) - self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5) - self.assertEqual(phone_home_stats["daily_sent_messages"], 10) - self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5) - - # Our users have not been around for >30 days, hence these are all 0. - self.assertEqual(phone_home_stats["r30v2_users_all"], 0) - self.assertEqual(phone_home_stats["r30v2_users_android"], 0) - self.assertEqual(phone_home_stats["r30v2_users_ios"], 0) - self.assertEqual(phone_home_stats["r30v2_users_electron"], 0) - self.assertEqual(phone_home_stats["r30v2_users_web"], 0) - self.assertEqual( - phone_home_stats["cache_factor"], self.hs.config.caches.global_factor - ) - self.assertEqual( - phone_home_stats["event_cache_size"], - self.hs.config.caches.event_cache_size, - ) - self.assertEqual( - phone_home_stats["database_engine"], - self.hs.config.database.databases[0].config["name"], - ) - self.assertEqual( - phone_home_stats["database_server_version"], - self.hs.get_datastores().main.database_engine.server_version, - ) - - synapse_logger = logging.getLogger("synapse") - log_level = synapse_logger.getEffectiveLevel() - self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level)) diff --git a/tests/storage/test_event_stats.py b/tests/storage/test_event_stats.py deleted file mode 100644 index 791ed27018e..00000000000 --- a/tests/storage/test_event_stats.py +++ /dev/null @@ -1,237 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright (C) 2025 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. - - -from twisted.test.proto_helpers import MemoryReactor - -from synapse.rest import admin, login, register, room -from synapse.server import HomeServer -from synapse.types.storage import _BackgroundUpdates -from synapse.util import Clock - -from tests import unittest - - -class EventStatsTestCase(unittest.HomeserverTestCase): - """ - Tests for the `event_stats` table - """ - - servlets = [ - admin.register_servlets_for_client_rest_resource, - room.register_servlets, - register.register_servlets, - login.register_servlets, - ] - - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.store = hs.get_datastores().main - - # Wait for the background updates to add the database triggers that keep the - # `event_stats` table up-to-date. - # - # This also prevents background updates running during the tests and messing - # with the results. - self.wait_for_background_updates() - - super().prepare(reactor, clock, hs) - - def _perform_user_actions(self) -> None: - """ - Perform some actions on the homeserver that would bump the event counts. - """ - # Create some users - user_1_mxid = self.register_user( - username="test_user_1", - password="test", - ) - user_2_mxid = self.register_user( - username="test_user_2", - password="test", - ) - # Note: `self.register_user` does not support guest registration, and updating the - # Admin API it calls to add a new parameter would cause the `mac` parameter to fail - # in a backwards-incompatible manner. Hence, we make a manual request here. - _guest_user_mxid = self.make_request( - method="POST", - path="/_matrix/client/v3/register?kind=guest", - content={ - "username": "guest_user", - "password": "test", - }, - shorthand=False, - ) - - # Log in to each user - user_1_token = self.login(username=user_1_mxid, password="test") - user_2_token = self.login(username=user_2_mxid, password="test") - - # Create a room between the two users - room_1_id = self.helper.create_room_as( - is_public=False, - tok=user_1_token, - ) - - # Mark this room as end-to-end encrypted - self.helper.send_state( - room_id=room_1_id, - event_type="m.room.encryption", - body={ - "algorithm": "m.megolm.v1.aes-sha2", - "rotation_period_ms": 604800000, - "rotation_period_msgs": 100, - }, - state_key="", - tok=user_1_token, - ) - - # User 1 invites user 2 - self.helper.invite( - room=room_1_id, - src=user_1_mxid, - targ=user_2_mxid, - tok=user_1_token, - ) - - # User 2 joins - self.helper.join( - room=room_1_id, - user=user_2_mxid, - tok=user_2_token, - ) - - # User 1 sends 10 unencrypted messages - for _ in range(10): - self.helper.send( - room_id=room_1_id, - body="Zoinks Scoob! A message!", - tok=user_1_token, - ) - - # User 2 sends 5 encrypted "messages" - for _ in range(5): - self.helper.send_event( - room_id=room_1_id, - type="m.room.encrypted", - content={ - "algorithm": "m.olm.v1.curve25519-aes-sha2", - "sender_key": "some_key", - "ciphertext": { - "some_key": { - "type": 0, - "body": "encrypted_payload", - }, - }, - }, - tok=user_2_token, - ) - - def test_background_update_with_events(self) -> None: - """ - Test that the background update to populate the `event_stats` table works - correctly when there are events in the database. - """ - # Do things to bump the stats - self._perform_user_actions() - - # Keep in mind: These are already populated as the background update has already - # ran once when Synapse started and added the database triggers which are - # incrementing things as new events come in. - self.assertEqual(self.get_success(self.store.count_total_events()), 24) - self.assertEqual(self.get_success(self.store.count_total_messages()), 10) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5) - - # Run the background update again - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - "progress_json": "{}", - }, - ) - ) - self.store.db_pool.updates._all_done = False - self.wait_for_background_updates() - - # We expect these values to double as the background update is being run *again* - # and will double-count the `events`. - self.assertEqual(self.get_success(self.store.count_total_events()), 48) - self.assertEqual(self.get_success(self.store.count_total_messages()), 20) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10) - - def test_background_update_without_events(self) -> None: - """ - Test that the background update to populate the `event_stats` table works - correctly without events in the database. - """ - # Keep in mind: These are already populated as the background update has already - # ran once when Synapse started and added the database triggers which are - # incrementing things as new events come in. - # - # In this case, no events have been sent, so we expect the counts to be 0. - self.assertEqual(self.get_success(self.store.count_total_events()), 0) - self.assertEqual(self.get_success(self.store.count_total_messages()), 0) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0) - - # Run the background update again - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - "progress_json": "{}", - }, - ) - ) - self.store.db_pool.updates._all_done = False - self.wait_for_background_updates() - - self.assertEqual(self.get_success(self.store.count_total_events()), 0) - self.assertEqual(self.get_success(self.store.count_total_messages()), 0) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0) - - def test_background_update_resume_progress(self) -> None: - """ - Test that the background update to populate the `event_stats` table works - correctly to resume from `progress_json`. - """ - # Do things to bump the stats - self._perform_user_actions() - - # Keep in mind: These are already populated as the background update has already - # ran once when Synapse started and added the database triggers which are - # incrementing things as new events come in. - self.assertEqual(self.get_success(self.store.count_total_events()), 24) - self.assertEqual(self.get_success(self.store.count_total_messages()), 10) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5) - - # Run the background update again - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - "progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }', - }, - ) - ) - self.store.db_pool.updates._all_done = False - self.wait_for_background_updates() - - # We expect these values to increase as the background update is being run - # *again* and will double-count some of the `events` over the range specified - # by the `progress_json`. - self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7) - self.assertEqual(self.get_success(self.store.count_total_messages()), 16) - self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)