Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5546ff6

Browse files
committedFeb 11, 2025
Initial Commit
1 parent 0fc1ccd commit 5546ff6

6 files changed

+194
-0
lines changed
 

‎IRCTC Flowchart.png

188 KB
Loading

‎bigquery_create_table.sql

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
CREATE TABLE `irctc_dwh.irctc_stream_tb` (
2+
row_key STRING,
3+
name STRING,
4+
age INT64,
5+
email STRING,
6+
join_date DATE,
7+
last_login TIMESTAMP,
8+
loyalty_points INT64,
9+
account_balance FLOAT64,
10+
is_active BOOL,
11+
inserted_at TIMESTAMP,
12+
updated_at TIMESTAMP,
13+
loyalty_status STRING,
14+
account_age_days INT64
15+
);

‎config.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# config.py
2+
3+
PROJECT_ID = "gds-project-432013" # Your GCP Project ID
4+
TOPIC_ID = "irctc-data" # Pub/Sub topic for data ingestion
5+
SUBSCRIPTION_ID = "irctc-data-sub" # Pub/Sub subscription for streaming
6+
BQ_DATASET = "irctc_dwh" # BigQuery dataset name
7+
BQ_TABLE = "irctc_stream_tb" # BigQuery table name
8+
TEMP_LOCATION = "gs://your-bucket/temp" # GCS bucket for Dataflow temp files
9+
STAGING_LOCATION = "gs://your-bucket/staging" # GCS bucket for Dataflow staging files

‎dataflow_pipeline.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import apache_beam as beam
2+
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
3+
import json
4+
import config
5+
6+
class TransformData(beam.DoFn):
7+
def process(self, element):
8+
try:
9+
record = json.loads(element.decode("utf-8"))
10+
record['name'] = record.get('name', '').title() # Capitalize name
11+
record['email'] = record.get('email', '').lower() # Convert email to lowercase
12+
record['loyalty_status'] = 'Platinum' if record.get('loyalty_points', 0) > 500 else 'Standard'
13+
yield record
14+
except Exception as e:
15+
print(f"Error processing record: {e}")
16+
17+
def run():
18+
pipeline_options = PipelineOptions(
19+
streaming=True,
20+
project=config.PROJECT_ID,
21+
temp_location=config.TEMP_LOCATION,
22+
staging_location=config.STAGING_LOCATION
23+
)
24+
pipeline_options.view_as(StandardOptions).streaming = True
25+
26+
with beam.Pipeline(options=pipeline_options) as pipeline:
27+
(
28+
pipeline
29+
| "Read from Pub/Sub" >> beam.io.ReadFromPubSub(subscription=f"projects/{config.PROJECT_ID}/subscriptions/{config.SUBSCRIPTION_ID}")
30+
| "Transform Data" >> beam.ParDo(TransformData())
31+
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
32+
table=f"{config.PROJECT_ID}:{config.BQ_DATASET}.{config.BQ_TABLE}",
33+
schema="row_key:STRING, name:STRING, age:INTEGER, email:STRING, join_date:DATE, last_login:TIMESTAMP, loyalty_points:INTEGER, account_balance:FLOAT, is_active:BOOLEAN, inserted_at:TIMESTAMP, updated_at:TIMESTAMP, loyalty_status:STRING, account_age_days:INTEGER",
34+
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
35+
)
36+
)
37+
38+
if __name__ == "__main__":
39+
run()

‎irctc_mock_data_to_pubsub.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from google.cloud import pubsub_v1
2+
import random
3+
import string
4+
import uuid
5+
import json
6+
from datetime import datetime, timedelta
7+
8+
# Configuration
9+
project_id = "gds-project-432013"
10+
topic_id = "irctc-data"
11+
12+
def initialize_pubsub():
13+
try:
14+
publisher = pubsub_v1.PublisherClient()
15+
topic_path = publisher.topic_path(project_id, topic_id)
16+
return publisher, topic_path
17+
except Exception as e:
18+
print(f"Failed to initialize Pub/Sub client: {e}")
19+
raise
20+
21+
# Generate mock data
22+
def generate_mock_data(num_rows):
23+
try:
24+
data = []
25+
for _ in range(num_rows):
26+
row_key = str(uuid.uuid4())
27+
row_data = {
28+
"row_key": row_key,
29+
"name": ''.join(random.choices(string.ascii_letters, k=10)),
30+
"age": random.randint(18, 90),
31+
"email": ''.join(random.choices(string.ascii_lowercase, k=5)) + "@example.com",
32+
"join_date": (datetime.now() - timedelta(days=random.randint(0, 3650))).strftime('%Y-%m-%d'),
33+
"last_login": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
34+
"loyalty_points": random.randint(0, 1000),
35+
"account_balance": round(random.uniform(100, 10000), 2),
36+
"is_active": random.choice([True, False]),
37+
"inserted_at": datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
38+
"updated_at": None
39+
}
40+
data.append(row_data)
41+
return data
42+
except Exception as e:
43+
print(f"Failed to generate mock data: {e}")
44+
raise
45+
46+
# Publish data to Pub/Sub
47+
def publish_to_pubsub(publisher, topic_path, data):
48+
try:
49+
for record in data:
50+
message_json = json.dumps(record)
51+
message_bytes = message_json.encode('utf-8')
52+
future = publisher.publish(topic_path, data=message_bytes)
53+
print("Data - > ",message_json)
54+
print(f"Published message ID: {future.result()}")
55+
print(f"Published {len(data)} messages successfully.")
56+
except Exception as e:
57+
print(f"Failed to publish data: {e}")
58+
raise
59+
60+
# Main execution
61+
if __name__ == "__main__":
62+
try:
63+
publisher, topic_path = initialize_pubsub()
64+
65+
# Generate and publish mock data
66+
mock_data = generate_mock_data(20)
67+
publish_to_pubsub(publisher, topic_path, mock_data)
68+
69+
except Exception as e:
70+
print(f"An error occurred during the execution: {e}")

‎transform_udf.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import json
2+
from datetime import datetime
3+
4+
def transform_data(element):
5+
try:
6+
# Parse the JSON message
7+
record = json.loads(element.replace("'", "\""))
8+
9+
# Data Cleaning and Validation
10+
record['row_key'] = record.get('row_key', '')
11+
record['name'] = record.get('name', '').title() # Capitalize the name
12+
record['email'] = record.get('email', '').lower() # Ensure email is lowercase
13+
record['is_active'] = bool(record.get('is_active', False)) # Ensure is_active is a boolean
14+
15+
# Enriching Data
16+
record['loyalty_status'] = 'Platinum' if record.get('loyalty_points', 0) > 500 else 'Standard'
17+
18+
# Convert inserted_at and updated_at to ISO format, handle missing or invalid timestamps
19+
inserted_at = record.get('inserted_at')
20+
updated_at = record.get('updated_at')
21+
22+
if inserted_at:
23+
try:
24+
record['inserted_at'] = datetime.strptime(inserted_at, '%Y-%m-%d %H:%M:%S').isoformat()
25+
except ValueError:
26+
record['inserted_at'] = datetime.utcnow().isoformat()
27+
else:
28+
record['inserted_at'] = datetime.utcnow().isoformat()
29+
30+
if updated_at:
31+
try:
32+
record['updated_at'] = datetime.strptime(updated_at, '%Y-%m-%d %H:%M:%S').isoformat()
33+
except ValueError:
34+
record['updated_at'] = '1970-01-01T00:00:00' # Set to Unix epoch if parsing fails
35+
else:
36+
record['updated_at'] = '1970-01-01T00:00:00' # Set to Unix epoch if not provided
37+
38+
# Calculate account age in days (assumes join_date is in YYYY-MM-DD format)
39+
join_date = record.get('join_date')
40+
if join_date:
41+
try:
42+
join_date_obj = datetime.strptime(join_date, '%Y-%m-%d')
43+
record['account_age_days'] = (datetime.utcnow() - join_date_obj).days
44+
except ValueError:
45+
record['account_age_days'] = 0
46+
else:
47+
record['account_age_days'] = 0 # Default to 0 if join_date is missing
48+
49+
# Handling missing or invalid values with defaults
50+
record['age'] = record.get('age', 0)
51+
record['account_balance'] = record.get('account_balance', 0.0)
52+
record['loyalty_points'] = record.get('loyalty_points', 0)
53+
record['last_login'] = record.get('last_login', '1970-01-01T00:00:00') # Default to epoch if missing
54+
55+
# Return JSON string with double quotes
56+
return json.dumps(record)
57+
58+
except Exception as e:
59+
print(f"Error processing record: {e}")
60+
return None # Handle errors appropriately
61+

0 commit comments

Comments
 (0)
Please sign in to comment.