Skip to content

Commit 931bc71

Browse files
Merge pull request #11 from datasciencecampus/8-punctuality-by-geography
8 punctuality by geography
2 parents 09eba6e + 9ca496d commit 931bc71

18 files changed

+510
-29
lines changed

README.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!--- Badges start --->
2-
<img src="https://img.shields.io/badge/repo%20status-in%20development%20(caution)-red" alt="Repository status is still in development (caution required)"/>
2+
<img src="https://img.shields.io/badge/repo%20status-in%20development%20(caution)-red" alt="Repository status is still in development (caution required)"/> <a href="https://codecov.io/gh/datasciencecampus/bus-metrics-england" > <img src="https://codecov.io/gh/datasciencecampus/bus-metrics-england/branch/dev/graph/badge.svg?token=hnkFyxDgV7"/></a>
33
<!--- Badges end --->
44

55
<img src="https://github.com/datasciencecampus/awesome-campus/blob/master/ons_dsc_logo.png">
@@ -30,10 +30,15 @@ You will also require a `.env` file in the format:
3030
BODS_API_KEY="<api key for the BODS service>"
3131
```
3232

33-
Data ingest scripts are now available. All resources (including geography and timetable data) and a sample 1 minute cut of real time data can be obtained:
33+
Data ingest scripts are now available. All resources (including geography and timetable data) and a sample 1 minute cut of real time data can be obtained. Punctuality can be acquired for any of 5 geographies using the `--geography` attribute:
3434

3535
```shell
3636
python setup.py
37+
38+
# to be merged into setup
39+
python src/bus_metrics/aggregation/build_schedules.py
40+
41+
python run.py -g lsoa
3742
```
3843

3944
### Pre-commit actions

outputs/punctuality/.gitkeep

Whitespace-only changes.

requirements.txt

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pre-commit
1010
pyarrow==14.0
1111
pytest
1212
pytest-pythonpath
13+
pytest-mock
14+
pytest-randomly
1315
Rtree==1.1.0
1416
python-dotenv
1517
coverage
18+
gtfs-realtime-bindings>=0.0.7

run.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Runs pipeline to reaggregate bus metrics by geography."""
2+
import argparse
3+
from src.bus_metrics.aggregation.punctuality_rate import AggregationTool
4+
5+
6+
def main():
7+
"""Reaggregate stop-level punctuality by selected geography.
8+
9+
Returns
10+
-------
11+
df: pandas.DataFrame
12+
DataFrame of number of service stops and
13+
punctuality rate by user-selected geography.
14+
15+
"""
16+
parser = argparse.ArgumentParser()
17+
parser.add_argument("-g", "--geography", nargs="?", help="which geography")
18+
args = parser.parse_args()
19+
20+
if not args.geography:
21+
geography = "lsoa"
22+
else:
23+
geography = args.geography
24+
25+
aTool = AggregationTool(geography=geography)
26+
df = aTool.punctuality_by_geography()
27+
28+
return df
29+
30+
31+
if __name__ == "__main__":
32+
main()

setup.py

+26-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
"""Initial pipeline to obtain project resources/data."""
22
from src.bus_metrics.setup.ingest_static_data import StaticDataIngest
33
from src.bus_metrics.setup.ingest_realtime_data import RealtimeDataIngest
4+
from src.bus_metrics.setup.build_lookup import create
5+
6+
# from src.bus_metrics.aggregation.build_schedules import Schedule_Builder
47
from datetime import datetime
58
import logging
69
import os
@@ -36,7 +39,7 @@ def data_folder(logger: logging.Logger) -> None:
3639

3740

3841
if __name__ == "__main__": # noqa: C901
39-
session_name = f"ingest_{format(scriptStartTime, '%Y_%m_%d_%H:%M')}"
42+
session_name = f"ingest_{format(datetime.now(), '%Y_%m_%d_%H:%M')}"
4043
logger = logging.getLogger(__name__)
4144
log_fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
4245
logging.basicConfig(
@@ -70,6 +73,10 @@ def data_folder(logger: logging.Logger) -> None:
7073
pass
7174

7275
if ingest_toml["download_realtime_sample"]:
76+
77+
scriptStartTime = datetime.now()
78+
scriptStartTimeUnix = time.mktime(scriptStartTime.timetuple())
79+
7380
# TODO: more articulate ways of triggering every 10 seconds
7481
while (
7582
time.mktime(datetime.now().timetuple()) < scriptStartTimeUnix + 60
@@ -88,6 +95,24 @@ def data_folder(logger: logging.Logger) -> None:
8895
logger.warning("##Amend toml to download sample##")
8996
logger.warning("##Use shell script to download heavy##")
9097

98+
try:
99+
logger.info("Building stops-geography lookup table")
100+
# TODO: naming/access of function to be improved
101+
create()
102+
except Exception as e:
103+
logger.warning(f"Build error: {e}")
104+
pass
105+
106+
# build_schedules.py executed separately for now
107+
# try:
108+
# schedule_builder = Schedule_Builder()
109+
# logger.info("Preparing punctuality data by stop")
110+
# # TODO: naming/access of method to be improved
111+
# schedule_builder.run()
112+
# except Exception as e:
113+
# logger.warning(f"Build error: {e}")
114+
# pass
115+
91116
logger.info("-----------------------------")
92117
logger.info("-------SETUP COMPLETED-------")
93118
logger.info("-----------------------------")

src/bus_metrics/aggregation/preprocessing.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,17 @@ def convert_unix_to_time_string(
258258
return df
259259

260260

261-
def build_stops(output: str = "polars") -> pl.DataFrame | pd.DataFrame:
261+
def build_stops(
262+
output: str = "polars", stops_data: str = "data/resources/gb_stops.csv"
263+
) -> pl.DataFrame | pd.DataFrame:
262264
"""Read in gb_stops file and outouts as DataFrame.
263265
264266
Parameters
265267
----------
266268
output : str
267269
Output type, polars or pandas DataFrame. (Defaults "polars").
270+
stops_data : str
271+
Filepath to raw NAPTAN stops data locally.
268272
269273
Returns
270274
-------
@@ -274,7 +278,7 @@ def build_stops(output: str = "polars") -> pl.DataFrame | pd.DataFrame:
274278
"""
275279
# import NapTAN data
276280
stops = pl.read_csv(
277-
"data/resources/gb_stops.csv",
281+
stops_data,
278282
ignore_errors=True,
279283
dtypes={"stop_id": pl.Utf8}, # noqa: E501
280284
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""Class of tools required to reaggregate bus metrics by geographies."""
2+
import pandas as pd
3+
import toml
4+
from datetime import datetime
5+
6+
7+
class AggregationTool:
8+
"""Aggregate bus metrics by geographies.
9+
10+
Parameters
11+
----------
12+
stop_level_punctuality: str
13+
Full filepath to stop-level punctuality aggregated data
14+
config: dict
15+
Dictionary of imported toml ingest variables
16+
geography: str
17+
Geography by which data is to be aggregated
18+
19+
Attributes
20+
----------
21+
code: str
22+
Geography code in boundaries data e.g. LSOA21CD
23+
name: str
24+
Geography name in boundaries data e.g. LSOA21NM
25+
26+
Methods
27+
-------
28+
punctuality_by_geography
29+
Combine stop-level punctuality data with boundaries
30+
and reaggregate
31+
32+
"""
33+
34+
def __init__(
35+
self,
36+
config: dict = toml.load("src/bus_metrics/setup/ingest.toml"),
37+
geography_lookup_table: str = "data/resources/geography_lookup_table.csv", # noqa: E501
38+
geography: str = "lsoa",
39+
outdir: str = "outputs/punctuality",
40+
) -> None:
41+
42+
self.region = config["region_to_analyse"]
43+
self.date = datetime.now().strftime("%Y%m%d")
44+
self.stop_level_punctuality: str = f"data/stop_level_punctuality/punctuality_by_stop_{self.region}_{self.date}.csv" # noqa: E501
45+
self.geography_lookup_table = geography_lookup_table
46+
self.geography = geography
47+
self.config = config
48+
self.code = self.config["boundaries"][self.geography]["code"]
49+
self.name = self.config["boundaries"][self.geography]["name"]
50+
self.outdir = outdir
51+
52+
def merge_geographies_with_stop_punctuality(
53+
self,
54+
) -> pd.DataFrame | Exception:
55+
"""Merge geography labels and stop-level punctuality.
56+
57+
Returns
58+
-------
59+
df: pandas.DataFrame
60+
Dataframe of stop-level punctuality with all
61+
associated geography labels.
62+
63+
Raises
64+
------
65+
FileNotFoundError
66+
When either stops or geography lookup
67+
do not exist locally.
68+
69+
"""
70+
try:
71+
stops = pd.read_csv(self.stop_level_punctuality, index_col=0)
72+
lookup = pd.read_csv(self.geography_lookup_table, index_col=0)
73+
df = pd.merge(
74+
stops,
75+
lookup,
76+
on=["stop_id", "stop_lat", "stop_lon"],
77+
how="left",
78+
)
79+
return df
80+
81+
except FileNotFoundError as e:
82+
print(e, "Please re-run the build_lookup.py script.")
83+
raise
84+
85+
def _reaggregate_punctuality(
86+
self, labelled: pd.DataFrame = None
87+
) -> pd.DataFrame:
88+
"""Re-aggregate stop-level punctuality by specified geography.
89+
90+
Parameters
91+
----------
92+
labelled: pandas.DataFrame
93+
Dataframe of stop-level punctuality with all
94+
available geography labels associated with each.
95+
96+
Returns
97+
-------
98+
df: pandas.DataFrame
99+
Dataframe of number of service stops
100+
and punctuality rate aggregated by geography.
101+
102+
"""
103+
code = self.config["boundaries"][self.geography]["code"]
104+
name = self.config["boundaries"][self.geography]["name"]
105+
106+
labelled["punctual_service_stops"] = (
107+
labelled["service_stops"] * labelled["punctuality_rate"]
108+
).astype(int)
109+
df = labelled.groupby([code, name]).agg(
110+
{"service_stops": "sum", "punctual_service_stops": "sum"}
111+
)
112+
df["punctuality_rate"] = (
113+
df["punctual_service_stops"] / df["service_stops"]
114+
)
115+
df = df.reset_index()
116+
117+
return df
118+
119+
def punctuality_by_geography(self):
120+
"""Collect punctuality data, reaggregate and store locally.
121+
122+
Returns
123+
-------
124+
df: pandas.DataFrame
125+
Dataframe of number of service stops
126+
and punctuality rate aggregated by geography.
127+
128+
"""
129+
date_time = datetime.now().strftime("%Y%m%d-%H%M%S")
130+
df = self.merge_geographies_with_stop_punctuality()
131+
df = self._reaggregate_punctuality(df)
132+
df.to_csv(f"{self.outdir}/{self.geography}_{date_time}.csv")
133+
return df

src/bus_metrics/setup/build_lookup.py

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""Tool to generate lookup table for stops to various geography levels."""
2+
3+
import toml
4+
import geopandas as gpd
5+
import pandas as pd
6+
from src.bus_metrics.setup.ingest_static_data import StaticDataIngest
7+
8+
9+
def _build_lookup_tool(
10+
stops: pd.DataFrame,
11+
bounds: gpd.GeoDataFrame,
12+
bounds_code: str,
13+
bounds_name: str,
14+
) -> pd.DataFrame:
15+
"""Allocate geography labels to bus stops by means of a spatial join.
16+
17+
Parameters
18+
----------
19+
stops: pandas.DataFrame
20+
Dataframe of NAPTAN stops data.
21+
bounds: geopandas.GeoDataFrame
22+
Dataframe of geography boundaries data.
23+
bounds_code: str
24+
Geography code e.g. LSOA21CD
25+
bounds_name: str
26+
Geography name e.g. LSOA21NM
27+
28+
Returns
29+
-------
30+
df: pandas.DataFrame
31+
Dataframe of stops-geography lookup table.
32+
33+
"""
34+
stops["geometry"] = gpd.points_from_xy(
35+
stops["stop_lon"], stops["stop_lat"]
36+
)
37+
stops = gpd.GeoDataFrame(stops)
38+
stops = stops.set_crs("4326")
39+
df = stops.sjoin(bounds, how="left", predicate="within")
40+
41+
cols = list(stops.columns)
42+
cols.extend((bounds_code, bounds_name))
43+
df = df[cols]
44+
45+
return df
46+
47+
48+
def create() -> pd.DataFrame:
49+
"""Download and process boundaries data. Label stops.
50+
51+
Returns
52+
-------
53+
stops: pandas.DataFrame
54+
Dataframe of stops-geography lookup table.
55+
56+
"""
57+
installer = StaticDataIngest()
58+
config = toml.load("src/bus_metrics/setup/ingest.toml")
59+
boundaries = config["boundaries"]
60+
61+
# TODO: address mixed dtypes pandas DtypeWarning
62+
stops = pd.read_csv("data/resources/gb_stops.csv", index_col=0)
63+
stops = stops[stops["Status"] == "active"]
64+
stops = stops[["ATCOCode", "Latitude", "Longitude"]]
65+
stops.columns = ["stop_id", "stop_lat", "stop_lon"]
66+
67+
# TODO: consider tqdm progress bar for large file downloads
68+
for geog in boundaries:
69+
url = boundaries[geog]["url"]
70+
filename = boundaries[geog]["filename"]
71+
code = boundaries[geog]["code"]
72+
name = boundaries[geog]["name"]
73+
74+
try:
75+
print(
76+
f"Downloading/Processing FULL RES(!) {geog} boundary data..."
77+
)
78+
installer.ingest_data_from_geoportal(url, filename)
79+
80+
except FileExistsError:
81+
pass
82+
83+
bounds = gpd.read_file(filename)
84+
stops = _build_lookup_tool(stops, bounds, code, name)
85+
stops = stops.drop(columns="geometry")
86+
87+
print("Storing lookup file...")
88+
# note: currently retains all stops in NAPTAN data
89+
# irrespective of location across UK
90+
stops.to_csv("data/resources/geography_lookup_table.csv")
91+
92+
return stops
93+
94+
95+
if __name__ == "__main__":
96+
create()

0 commit comments

Comments
 (0)