Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ on:

## Test, DAB, Tell dev repo folder on databricks to pull (so in sync with its dab ... or should it deploy dab but apparently git folders are for reference)
jobs:
todo:
runs-on: ubuntu-latest
steps:
- name: TODO
run: |
echo "TODO: Implement Databricks Dev Shared deployment pipeline"
54 changes: 27 additions & 27 deletions .github/workflows/prod-cd.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
#No code quality checks staging has done it
name: Deploy to Production Databricks Workspace
on:
push:
branches:
- main
jobs:
deploy_prod:
name: "Deploy Bundle to Production Environment"
runs-on: ubuntu-latest
needs: testing
environment: prod
env:
DATABRICKS_HOST: ${{ vars.DBX_HOST }}
DATABRICKS_CLIENT_ID: ${{ vars.DBX_SP_ID }}
DATABRICKS_CLIENT_SECRET: ${{ secrets.DBX_SP_SECRET }}
steps:
# qqqq add version and changelog creation step, and give a dab version matching repo version
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- name: Deploy bundle
run: databricks bundle deploy -t prod --auto-approve
#No code quality checks staging has done it

name: Deploy to Production Databricks Workspace

on:
push:
branches:
- main

jobs:

deploy_prod:
name: "Deploy Bundle to Production Environment"
runs-on: ubuntu-latest
needs: testing
environment: prod
env:
DATABRICKS_HOST: ${{ vars.DBX_HOST }}
DATABRICKS_CLIENT_ID: ${{ vars.DBX_SP_ID }}
DATABRICKS_CLIENT_SECRET: ${{ secrets.DBX_SP_SECRET }}

steps:
# qqqq add version and changelog creation step, and give a dab version matching repo version
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- name: Deploy bundle
run: databricks bundle deploy -t prod --auto-approve
working-directory: .
86 changes: 86 additions & 0 deletions scratch-gitincludeforpoc/deleteme.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "7d5797ba-0d16-44b0-9419-7eaabc2441ee",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"from pyspark.sql import functions as fn\n",
"\n",
"def working_days_calendar(spark):\n",
" \"\"\"\n",
" Create a calendar DataFrame with monthly working days.\n",
" Covers 10 years from 2020-01-01.\n",
" Excludes weekends as non-working days.\n",
" Prints sample data for debugging.\n",
" \"\"\"\n",
" \n",
" df = spark.range(0, 365 * 10).select(\n",
" fn.expr(\"date_add('2020-01-01', cast(id as int))\").alias(\"Date\")\n",
" ).withColumn(\n",
" \"Month_Start\", fn.trunc(\"Date\", \"MM\")\n",
" ).withColumn(\n",
" \"Working_Day_Type\",\n",
" fn.when(fn.date_format(\"Date\", \"EEEE\").isin(\"Saturday\", \"Sunday\"), \"N\").otherwise(\"Y\")\n",
" ).withColumn(\n",
" \"Working_Day_Calc\",\n",
" fn.when(fn.col(\"Working_Day_Type\") == \"Y\", 1).otherwise(0)\n",
" )\n",
"\n",
" # DEBUG: Print the schema and a small sample of data\n",
" print(\"\\n=== Calendar Schema ===\")\n",
" df.printSchema()\n",
" print(\"\\n=== Calendar Sample Data ===\")\n",
" df.orderBy(\"Date\").show(12, truncate=False)\n",
"\n",
" # Optional: Aggregate by month to get total working days\n",
" cal_agg = df.groupBy(\"Month_Start\").agg(\n",
" fn.sum(\"Working_Day_Calc\").alias(\"Total_Working_Days\")\n",
" )\n",
"\n",
" # DEBUG: Print aggregated result\n",
" print(\"\\n=== Aggregated Calendar Sample ===\")\n",
" cal_agg.orderBy(\"Month_Start\").show(12, truncate=False)\n",
"\n",
" return cal_agg\n",
"\n",
"calendar_df = working_days_calendar(spark)\n"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": {
"base_environment": "",
"environment_version": "4"
},
"inputWidgetPreferences": null,
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 4
},
"notebookName": "deleteme",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
88 changes: 48 additions & 40 deletions src/transformations/date_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,30 @@
from pyspark.sql import functions as fn # Added

## def working_days_monthly(df, col_start_date): # Original
def working_days_monthly(spark, df, col_start_date):
## adding spark, and the calendar, so that we seperate concerns and make it testable
## single corncern
## passing in df so dont need to pass spark but often may want to
## def working_days_monthly(spark, df, col_start_date, cal_df):

def working_days_monthly(working_days_cal_df, df, col_start_date):
"""
Description: Adds a column to monthly level data with working days, based on the first day of the month.

Parameters:
working_days_cal_df: DI calendar working days
df: the dataframe to perform the transformation on
col_start_date: the column that contains the first date of the month in your dataframe

Returns:
output: the initial dataframe with the new column added with working days for the month
"""

## Added because i dont have the global file
calendar_full = "/tmp/standard_calendar.parquet"

# 1. RELIABLE CHECK: Does the file exist?
file_exists = False
try:
dbutils.fs.ls(calendar_full)
file_exists = True
except:
file_exists = False

# 2. GENERATE if missing
if not file_exists:
print(f"Creating missing calendar at {calendar_full}...")
spark.range(0, 365 * 10).select(
fn.expr("date_add('2020-01-01', cast(id as int))").alias("Date")
).withColumn("Month_Start", fn.trunc("Date", "MM")) \
.withColumn("Working_Day_Type", fn.when(fn.date_format("Date", "EEEE").isin("Saturday", "Sunday"), "N").otherwise("Y")) \
.withColumn("Working_Day_Calc", fn.when(fn.col("Working_Day_Type") == "Y", 1).otherwise(0)) \
.write.mode("overwrite").parquet(calendar_full)

# 3. READ (This will now definitely work)
cal_df = spark.read.parquet(calendar_full) # This is a global file i dont have

cal_df = cal_df.filter(fn.col("Working_Day_Type") == "Y").dropDuplicates()

cal_df = cal_df.withColumn(
"Month_Start",
fn.date_format(fn.to_date(fn.col("Month_Start"), "yyyy-MM-dd"), "yyyy-MM-dd").cast("date")
)

# Aggregate by Month_Start and sum Working_Day_Calc
cal_df = cal_df.groupBy(
"Month_Start"
).agg(
fn.sum("Working_Day_Calc").alias("Total_Working_Days")
)

# Convert start date in df to date format for comparison
df = df.withColumn(col_start_date, fn.to_date(fn.col(col_start_date).cast("date")))

# Create a new column with the total working days
output = df.join(
cal_df.select("Month_Start", "Total_Working_Days"),
working_days_cal_df.select("Month_Start", "Total_Working_Days"),
fn.col(col_start_date) == fn.col("Month_Start"),
"left"
).withColumn(
Expand All @@ -67,3 +35,43 @@ def working_days_monthly(spark, df, col_start_date):
).drop("Month_Start", "Total_Working_Days")

return output


############### keeping just because poc
# ## Added because i dont have the global file
# calendar_full = "/tmp/standard_calendar.parquet"

# # 1. RELIABLE CHECK: Does the file exist?
# file_exists = False
# try:
# dbutils.fs.ls(calendar_full)
# file_exists = True
# except:
# file_exists = False

# # 2. GENERATE if missing
# if not file_exists:
# print(f"Creating missing calendar at {calendar_full}...")
# spark.range(0, 365 * 10).select(
# fn.expr("date_add('2020-01-01', cast(id as int))").alias("Date")
# ).withColumn("Month_Start", fn.trunc("Date", "MM")) \
# .withColumn("Working_Day_Type", fn.when(fn.date_format("Date", "EEEE").isin("Saturday", "Sunday"), "N").otherwise("Y")) \
# .withColumn("Working_Day_Calc", fn.when(fn.col("Working_Day_Type") == "Y", 1).otherwise(0)) \
# .write.mode("overwrite").parquet(calendar_full)

# # 3. READ (This will now definitely work)
# cal_df = spark.read.parquet(calendar_full) # This is a global file i dont have

# working_days_cal_df = working_days_cal_df.filter(fn.col("Working_Day_Type") == "Y").dropDuplicates()

# working_days_cal_df = working_days_cal_df.withColumn(
# "Month_Start",
# fn.date_format(fn.to_date(fn.col("Month_Start"), "yyyy-MM-dd"), "yyyy-MM-dd").cast("date")
# )

# Aggregate by Month_Start and sum Working_Day_Calc
# working_days_cal_df = working_days_cal_df.groupBy(
# "Month_Start"
# ).agg(
# fn.sum("Working_Day_Calc").alias("Total_Working_Days")
# )
3 changes: 1 addition & 2 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# QQQQ forcing a fail to make sure pipelines fail
# .loaders import load_csv_table
from .loaders import load_csv_table
Loading