diff --git a/databricks.yml b/databricks.yml index 03ccce6..de31443 100644 --- a/databricks.yml +++ b/databricks.yml @@ -84,11 +84,11 @@ variables: prod_env_sp_id: default: "my-sp-id-jfsdkjhfjsdhfkjh" -# qqqq will want later if many python files +# for external package maybe qqqq i dont think we will export there are poetry bundle example repos if we decide to # artifacts: - # python_artifact: - # type: whl - # build: uv build --wheel +# python_artifact: +# type: whl +# build: uv build --wheel # Deployment environments targets: diff --git a/pyproject.toml b/pyproject.toml index 1707b8a..940124b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,32 +1,33 @@ -[project] -name = "Workflow_POC" -version = "0.0.1" -authors = [{ name = "philip.tate@nhs.net" }] -requires-python = ">=3.10,<=3.13" -dependencies = [ - # Any dependencies for jobs and pipelines in this project can be added here - # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies - # - # LIMITATION: for pipelines, dependencies are cached during development; - # add dependencies to the 'environment' section of your pipeline.yml file instead -] - -[dependency-groups] -dev = [ - "pytest", - "databricks-dlt", - "databricks-connect>=15.4,<15.5", -] - -[project.scripts] -main = "Workflow_POC.main:main" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src"] - -[tool.black] -line-length = 125 +[project] +name = "Workflow_POC" +version = "0.0.1" +authors = [{ name = "philip.tate@nhs.net" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of your pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-dlt", + "databricks-connect>=15.4,<15.5", +] + +[project.scripts] +main = "Workflow_POC.main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +# Wheel packing came in the template :) qqqq +[tool.hatch.build.targets.wheel] +packages = ["src"] + +[tool.black] +line-length = 125 diff --git a/resources/pipeline/ods_ingestion.yml b/resources/pipeline/ods_ingestion.yml index 7c91c29..8a6a1ea 100644 --- a/resources/pipeline/ods_ingestion.yml +++ b/resources/pipeline/ods_ingestion.yml @@ -60,7 +60,9 @@ resources: # if doing a pipeline per layer would do something like # include: ../../src/ingestion/ - might work # include: ../../src/ingestion/*.py - doesnt work - include: ../../src/ingestion/ods_ingest.py + include: ../../src/ingestion/ods_ingest.py # -worked + # include: ../../src/**/*.py + #- folder: ../../src/utils photon: true # qqqq good practice to specify its something to do with dlt having beta version? channel: current diff --git a/src/ingestion/ods_ingest.py b/src/ingestion/ods_ingest.py index 9812f39..4abe78e 100644 --- a/src/ingestion/ods_ingest.py +++ b/src/ingestion/ods_ingest.py @@ -1,8 +1,21 @@ # for making spark tables # qqqq problem i am having is that we are setting the schema, and dev has schema set as user names # i want to use the databricks.yml schema name for dev, and for staging and prod i want to set it to bronze_ods in this script -from pyspark import pipelines as dp +# This does work, but dont know how the file is working generally when bundle files are not generated anymore, so its not working from within the bundle. +# ultimately probably still want to be in a wheel will see what happens with unit testing it +import sys +import os +sys.path.append(os.path.abspath('..')) + +### TRY ### +# import sys +# bundle_src_path = sys.argv[1] +# sys.path.append(bundle_src_path) + +from pyspark import pipelines as dp +#from utils.loaders import load_csv_table #use wheel instead +from utils.loaders import load_csv_table # Fixed System Constants these and some of this stuff should be going in a helper i think #ADLS_PROTOCOL = "abfss://" @@ -35,151 +48,264 @@ # domain_folder ?? qqqq folder_name = spark.conf.get("pipeline.domain") # ods #folder_location_path = f"{ADLS_PROTOCOL}{container}@{storage_account}{ADLS_SUFFIX}/{folder_name}/" -folder_location_path = f"{storage_container_path }/{folder_name}/" +folder_location_path = f"{storage_container_path}/{folder_name}/" # "abfss://bronze@unifiedrptdeltalake.dfs.core.windows.net/ods print(folder_location_path) +# qqqq this could be far simpler hardcode "import raw" the just list the names but we want it a bit more flexible + +# List of table names +TABLE_NAMES = [ + "Additional_Attributes_Details", + "Code_System_Details", + "Contact_Details", + "Manifest_Details", + "Organisation_Details", + "OtherID_Details", + "PrimaryRole_Details", + "Relationship_Details", + "Role_Details", + "Successor_Details", +] + + +# Define all ODS tables with their configurations +# tables as keys because unique +# only using dictionary because expect future use to have more to it + +# Generate table configurations dynamically +ODS_TABLES = { + table_name: { + "csv_filename": f"{table_name}.csv", + "comment": f"Import raw {table_name}" + } + for table_name in TABLE_NAMES +} + +# ODS_TABLES = { +# "Additional_Attributes_Details": { +# "csv_filename": "Additional_Attributes_Details.csv", +# "comment": "Import raw Additional_Attributes_Details" +# }, +# "Code_System_Details": { +# "csv_filename": "Code_System_Details.csv", +# "comment": "Import raw Code_System_Details" +# }, +# "Contact_Details": { +# "csv_filename": "Contact_Details.csv", +# "comment": "Import raw Contact_Details" +# }, +# "Manifest_Details": { +# "csv_filename": "Manifest_Details.csv", +# "comment": "Import raw Manifest_Details" +# }, +# "Organisation_Details": { +# "csv_filename": "Organisation_Details.csv", +# "comment": "Import raw Organisation_Details" +# }, +# "OtherID_Details": { +# "csv_filename": "OtherID_Details.csv", +# "comment": "Import raw OtherID_Details" +# }, +# "PrimaryRole_Details": { +# "csv_filename": "PrimaryRole_Details.csv", +# "comment": "Import raw PrimaryRole_Details" +# }, +# "Relationship_Details": { +# "csv_filename": "Relationship_Details.csv", +# "comment": "Import raw Relationship_Details" +# }, +# "Role_Details": { +# "csv_filename": "Role_Details.csv", +# "comment": "Import raw Role_Details" +# }, +# "Successor_Details": { +# "csv_filename": "Successor_Details.csv", +# "comment": "Import raw Successor_Details" +# }, +# } + +## get from wheel +# def load_csv_table(base_path, csv_filename): +# """Load CSV from Azure storage with standard options""" +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load(f"{base_path}{csv_filename}") +# ) + +# Create DLT tables dynamically +for table_name, config in ODS_TABLES.items(): + # Create a closure to capture the current loop variables + def create_table(name=table_name, cfg=config): + @dp.table(name=name, comment=cfg["comment"]) + def table_loader(): + return load_csv_table(folder_location_path, cfg["csv_filename"]) + return table_loader + + create_table() + + +# def load_csv_table(table_name): +# """Load CSV from Azure storage with standard options""" +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load(f"{folder_location_path}{table_name}.csv") +# ) + +# # Create DLT tables dynamically +# for table_name, comment in ODS_TABLES: +# # Create a closure to capture the current loop variables +# def create_table(name=table_name, desc=comment): +# @dp.table(name=name, comment=desc) +# def table_loader(): +# return load_csv_table(name) +# return table_loader + +# create_table() +# @dp.table( +# # qqqq was f"{schema_name}.Additional_Attributes_Details" but worked before now need to do it this way???!!! +# name="Additional_Attributes_Details", +# comment="Import raw Additional_Attributes_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Additional_Attributes_Details.csv" +# ) +# ) + +# @dp.table( +# name="Code_System_Details", +# comment="Import raw Code_System_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Code_System_Details.csv" +# ) +# ) -@dp.table( - # qqqq was f"{schema_name}.Additional_Attributes_Details" but worked before now need to do it this way???!!! - name="Additional_Attributes_Details", - comment="Import raw Additional_Attributes_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Additional_Attributes_Details.csv" - ) - ) - -@dp.table( - name="Code_System_Details", - comment="Import raw Code_System_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Code_System_Details.csv" - ) - ) - -@dp.table( - name="Contact_Details", - comment="Import raw Contact_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Contact_Details.csv" - ) - ) - -@dp.table( - name="Manifest_Details", - comment="Import raw Manifest_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Manifest_Details.csv" - ) - ) - -@dp.table( - name="Organisation_Details", - comment="Import raw Organisation_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Organisation_Details.csv" - ) - ) - -@dp.table( - name="OtherID_Details", - comment="Import raw OtherID_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}OtherID_Details.csv" - ) - ) - -@dp.table( - name="PrimaryRole_Details", - comment="Import raw PrimaryRole_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}PrimaryRole_Details.csv" - ) - ) - -@dp.table( - name="Relationship_Details", - comment="Import raw Relationship_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Relationship_Details.csv" - ) - ) - -@dp.table( - name="Role_Details", - comment="Import raw Role_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Role_Details.csv" - ) - ) - -@dp.table( - name="Successor_Details", - comment="Import raw Successor_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Successor_Details.csv" - ) - ) +# @dp.table( +# name="Contact_Details", +# comment="Import raw Contact_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Contact_Details.csv" +# ) +# ) + +# @dp.table( +# name="Manifest_Details", +# comment="Import raw Manifest_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Manifest_Details.csv" +# ) +# ) + +# @dp.table( +# name="Organisation_Details", +# comment="Import raw Organisation_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Organisation_Details.csv" +# ) +# ) + +# @dp.table( +# name="OtherID_Details", +# comment="Import raw OtherID_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}OtherID_Details.csv" +# ) +# ) + +# @dp.table( +# name="PrimaryRole_Details", +# comment="Import raw PrimaryRole_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}PrimaryRole_Details.csv" +# ) +# ) + +# @dp.table( +# name="Relationship_Details", +# comment="Import raw Relationship_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Relationship_Details.csv" +# ) +# ) + +# @dp.table( +# name="Role_Details", +# comment="Import raw Role_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Role_Details.csv" +# ) +# ) + +# @dp.table( +# name="Successor_Details", +# comment="Import raw Successor_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Successor_Details.csv" +# ) +# ) ###### Try this diff --git a/src/utils/.gitinclude b/src/utils/.gitinclude deleted file mode 100644 index e69de29..0000000 diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..df638bb --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,3 @@ +from utils.loaders import load_csv_table + +__all__ = ["load_csv_table"] \ No newline at end of file diff --git a/src/utils/loaders.py b/src/utils/loaders.py new file mode 100644 index 0000000..6fc0719 --- /dev/null +++ b/src/utils/loaders.py @@ -0,0 +1,21 @@ +"""Data loading utilities for Databricks pipelines""" +# i dont think we will want these as a package just as a module we wont be expoting and its just and extra steps for analyst which currently i do not think will provide value until they request it and will get in their way +def load_csv_table(base_path, csv_filename): + """Load CSV from Azure storage with standard options + + Args: + base_path: Base path to the folder containing CSV files + csv_filename: Name of the CSV file to load + + Returns: + DataFrame: Spark DataFrame with CSV data + """ + from pyspark.sql import SparkSession + spark = SparkSession.builder.getOrCreate() + + return ( + spark.read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(f"{base_path}{csv_filename}") + ) \ No newline at end of file