Skip to content

Commit 2d0930f

Browse files
committedFeb 23, 2025
update homework for week 4
1 parent 7ccc422 commit 2d0930f

File tree

3 files changed

+180
-0
lines changed

3 files changed

+180
-0
lines changed
 

‎project/homework.yml

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
id: chicago_traffic_data_vehicles
2+
namespace: dev
3+
4+
5+
variables:
6+
username: "{{kv('API_KEY_ID')}}"
7+
password: "{{kv('API_KEY_SECRET')}}"
8+
app_token: "{{kv('app_token')}}"
9+
10+
11+
# tasks:
12+
# - id: loop
13+
# type: io.kestra.plugin.core.flow.ForEach
14+
# values:
15+
# - "u6pd-qa9d"
16+
# - "85ca-t3if"
17+
# - "68nd-jvt3"
18+
19+
20+
tasks:
21+
- id: extract_traffic_crashes
22+
type: io.kestra.plugin.scripts.python.Script
23+
outputFiles:
24+
- "output_dataset.csv"
25+
26+
beforeCommands:
27+
- pip install pandas
28+
- pip install sodapy
29+
script: |
30+
import pandas as pd
31+
from sodapy import Socrata
32+
import numpy as np
33+
chunk_size = 1000
34+
offset = 0
35+
first_chunk=True
36+
seen_ids = set()
37+
client = Socrata("data.cityofchicago.org", "{{vars.app_token}}", username="{{vars.username}}", password="{{vars.password}}")
38+
columns_order = ['crash_unit_id', 'crash_record_id', 'crash_date', 'unit_no', 'unit_type', 'vehicle_id', 'make', 'model','vehicle_defect','vehicle_type', 'vehicle_use', 'travel_direction', 'maneuver', 'occupant_cnt', 'first_contact_point', 'vehicle_year']
39+
40+
while True:
41+
42+
results = client.get("68nd-jvt3" , limit=chunk_size , offset=offset)
43+
if not results:
44+
break
45+
46+
df = pd.DataFrame.from_records(results)
47+
df.drop(columns=["area_99_i", "lic_plate_state"], inplace=True)
48+
df.drop(df.columns[19:], axis=1 ,inplace=True)
49+
50+
df.replace("", np.nan, inplace=True)
51+
df.fillna("null", inplace=True)
52+
df['crash_date'] = pd.to_datetime(df['crash_date'], errors='coerce')
53+
54+
55+
if 'crash_unit_id' not in df.columns:
56+
raise ValueError("Dataset does not contain a unique 'crash_unit_id' column.")
57+
df = df
58+
df = df[~df['crash_unit_id'].isin(seen_ids)]
59+
seen_ids.update(df['crash_unit_id'])
60+
61+
df = df[columns_order]
62+
63+
if not df.empty:
64+
df.to_csv("output_dataset.csv", mode='a', index=False, header=first_chunk)
65+
first_chunk = False
66+
offset += chunk_size
67+
if offset == 2000:
68+
break
69+
70+
print("Data fetching and merging completed without duplicates.")
71+
72+
- id: debug_outputs
73+
type: io.kestra.plugin.core.debug.Return
74+
format: "{{ outputs.extract['outputFiles']['output_dataset.csv']}}"
75+
- id: upload_vehicle_data_to_gcs
76+
type: io.kestra.plugin.gcp.gcs.Upload
77+
from: "{{ outputs.extract.outputFiles['output_dataset.csv']}}"
78+
79+
to: "gs://{{kv('GCP_BUCKET_NAME')}}/traffic_crashes_vehicles.csv"
80+
81+
# - id: purge_files
82+
# type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
83+
# description: To avoid cluttering your storage, we will remove the downloaded files
84+
85+
86+
- id: bq_vehicles_data
87+
type: io.kestra.plugin.gcp.bigquery.Query
88+
sql: |
89+
CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data`(
90+
unique_id_vehicle BYTES OPTIONS(description = 'Unique key for this dataset'),
91+
crash_unit_id INTEGER OPTIONS (description = 'A unique identifier for each vehicle record.'),
92+
crash_record_id STRING OPTIONS (description = 'This number can be used to link to the same crash in the Crashes and People datasets. This number also serves as a unique ID in the Crashes dataset.'),
93+
crash_date TIMESTAMP OPTIONS (description = 'Date and time of crash as entered by the reporting officer'),
94+
unit_no INTEGER OPTIONS (description = 'A unique ID for each unit within a specific crash report.'),
95+
unit_type STRING OPTIONS (description ='The type of unit'),
96+
vehicle_id STRING OPTIONS (description = '') ,
97+
make STRING OPTIONS (description = 'The make (brand) of the vehicle, if relevant'),
98+
model STRING OPTIONS (description = 'The model of the vehicle, if relevant'),
99+
vehicle_defect STRING OPTIONS (description = '') ,
100+
vehicle_type STRING OPTIONS (description = 'The type of vehicle, if relevant'),
101+
vehicle_use STRING OPTIONS (description ='The normal use of the vehicle, if relevant'),
102+
travel_direction STRING OPTIONS (description = 'The direction in which the unit was traveling prior to the crash, as determined by the reporting officer'),
103+
maneuver STRING OPTIONS (description = 'The action the unit was taking prior to the crash, as determined by the reporting officer'), occupant_cnt STRING OPTIONS (description = 'The number of people in the unit, as determined by the reporting officer'),
104+
first_contact_point STRING OPTIONS (description = '') ,
105+
vehicle_year STRING OPTIONS (description = 'The model year of the vehicle, if relevant')
106+
)
107+
PARTITION BY DATE(crash_date)
108+
109+
110+
- id: bq_vehicles_data_external
111+
type: io.kestra.plugin.gcp.bigquery.Query
112+
sql: |
113+
CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data_ext`(
114+
crash_unit_id INTEGER OPTIONS (description = 'A unique identifier for each vehicle record.'),
115+
crash_record_id STRING OPTIONS (description = 'This number can be used to link to the same crash in the Crashes and People datasets. This number also serves as a unique ID in the Crashes dataset.'),
116+
crash_date TIMESTAMP OPTIONS (description = 'Date and time of crash as entered by the reporting officer'),
117+
unit_no INTEGER OPTIONS (description = 'A unique ID for each unit within a specific crash report.'),
118+
unit_type STRING OPTIONS (description ='The type of unit'),
119+
vehicle_id STRING OPTIONS (description = '') ,
120+
make STRING OPTIONS (description = 'The make (brand) of the vehicle, if relevant'),
121+
model STRING OPTIONS (description = 'The model of the vehicle, if relevant'),
122+
vehicle_defect STRING OPTIONS (description = '') ,
123+
vehicle_type STRING OPTIONS (description = 'The type of vehicle, if relevant'),
124+
vehicle_use STRING OPTIONS (description ='The normal use of the vehicle, if relevant'),
125+
travel_direction STRING OPTIONS (description = 'The direction in which the unit was traveling prior to the crash, as determined by the reporting officer'),
126+
maneuver STRING OPTIONS (description = 'The action the unit was taking prior to the crash, as determined by the reporting officer'), occupant_cnt STRING OPTIONS (description = 'The number of people in the unit, as determined by the reporting officer'),
127+
first_contact_point STRING OPTIONS (description = '') ,
128+
vehicle_year STRING OPTIONS (description = 'The model year of the vehicle, if relevant')
129+
130+
)
131+
OPTIONS (
132+
format = 'CSV',
133+
uris = ["gs://{{kv('GCP_BUCKET_NAME')}}/traffic_crashes_vehicles.csv"],
134+
skip_leading_rows = 1,
135+
ignore_unknown_values = TRUE
136+
)
137+
138+
139+
- id: bq_vehicles_table_tmp
140+
type: io.kestra.plugin.gcp.bigquery.Query
141+
sql: |
142+
CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data_tmp`
143+
AS
144+
SELECT
145+
MD5(CONCAT(
146+
COALESCE(CAST(crash_unit_id AS STRING), ""),
147+
COALESCE(CAST(crash_record_id AS STRING), ""),
148+
COALESCE(CAST(crash_date AS STRING), ""),
149+
COALESCE(CAST(unit_no AS STRING), "")
150+
)) AS unique_id_vehicle,
151+
*
152+
FROM `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data_ext`
153+
154+
- id: bq_persons_merge
155+
type: io.kestra.plugin.gcp.bigquery.Query
156+
sql: |
157+
MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data` T
158+
USING `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.vehicles_data_tmp` S
159+
ON T.unique_id_vehicle = S.unique_id_vehicle
160+
WHEN NOT MATCHED THEN
161+
INSERT (unique_id_vehicle, crash_unit_id, crash_record_id, crash_date, unit_no, unit_type, vehicle_id, make, model,vehicle_defect,vehicle_type, vehicle_use, travel_direction, maneuver, occupant_cnt, first_contact_point, vehicle_year )
162+
VALUES (S.unique_id_vehicle, S.crash_unit_id, S.crash_record_id, S.crash_date, S.unit_no, S. unit_type, S.vehicle_id, S.make, S.model, S.vehicle_defect, S.vehicle_type, S.vehicle_use, S.travel_direction, S.maneuver, S.occupant_cnt, S.first_contact_point,S.vehicle_year );
163+
164+
165+
pluginDefaults:
166+
- type: io.kestra.plugin.gcp
167+
values:
168+
serviceAccount: "{{kv('GCP_CREDS')}}"
169+
projectId: "{{kv('GCP_PROJECT_ID')}}"
170+
location: "{{kv('GCP_LOCATION')}}"
171+
bucket: "{{kv('GCP_BUCKET_NAME')}}"

‎week_4_analytics/homework/hw_file

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM
2+
`dataengineerproject-448203.taxi_rides.fct_fhv_monthly_zone_traveltime_p90`
3+
WHERE fhv_month = 11 and fhv_year=2019 and zone = "Yorkville East"
4+
ORDER BY travel_time_p90 DESC
5+
LIMIT 2 OFFSET 1
6+
7+
8+
--Added all my file to dbt project----

‎workshop_dlt/homework.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import dlt
22
import os
3+
34
import json
45
from dlt.sources.helpers.rest_client import RESTClient
56
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

0 commit comments

Comments
 (0)