diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63b7205..fba5fa1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,56 +1,56 @@ -# We want to run CI processes that can run independent of databricks as branch rules so that we dont # deploy at cost code that we already should know needs changing -# such as linting, and unit test for python, maybe dab? verify -# we run these on all pull request because if there is a hot fix it may not have passed through -# staging for example -# qqqq check this is up to date -name: CI - Pull Request Checks - -# Run CI on all pull requests -on: - pull_request: - branches: - - '**' # all branches - -jobs: - ci_checks: - name: "Linting, Unit Tests, DAB Verify" - runs-on: ubuntu-latest - - steps: - # Checkout code - - name: Checkout repository - uses: actions/checkout@v4 - - # Set up Python - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: "3.x" - - # Install dependencies used for linting and unit tests - - name: Install dependencies - run: pip install -r requirements-dev.txt - - # Run python unit tests - - name: Run Unit Tests - run: pytest tests/unit - - # Run python lint - # qqqq on example used flake8 instead - # pyproject.toml will need configuring - - name: Run Linting - run: pylint src - - # qqqq to do run commit lint step and put in commit lint config - # see TELBlazor - - name: Commit lint - run: | - echo "Commit lint not implemented" - exit 1 - - # qqqq to do run version generation step and put in commit lint config - # see TELBlazor - - name: Version Generation Test Run - run: | - echo "Version test run not implemented" - exit 1 +# We want to run CI processes that can run independent of databricks as branch rules so that we dont # deploy at cost code that we already should know needs changing +# such as linting, and unit test for python, maybe dab? verify +# we run these on all pull request because if there is a hot fix it may not have passed through +# staging for example +# qqqq check this is up to date +name: CI - Pull Request Checks + +# Run CI on all pull requests +on: + pull_request: + branches: + - '**' # all branches + +jobs: + ci_checks: + name: "Linting, Unit Tests, DAB Verify" + runs-on: ubuntu-latest + + steps: + # Checkout code + - name: Checkout repository + uses: actions/checkout@v4 + + # Set up Python + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + + # Install dependencies used for linting and unit tests + - name: Install dependencies + run: pip install -r requirements-dev.txt + + # Run python unit tests + - name: Run Unit Tests + run: pytest tests/unit + + # Run python lint + # qqqq on example used flake8 instead + # pyproject.toml will need configuring + - name: Run Linting + run: pylint src + + # qqqq to do run commit lint step and put in commit lint config + # see TELBlazor + - name: Commit lint + run: | + echo "Commit lint not implemented" + exit 1 + + # qqqq to do run version generation step and put in commit lint config + # see TELBlazor + - name: Version Generation Test Run + run: | + echo "Version test run not implemented" + exit 1 diff --git a/.github/workflows/manual-trigger-test-poc.yml b/.github/workflows/manual-trigger-test-poc.yml new file mode 100644 index 0000000..b02d834 --- /dev/null +++ b/.github/workflows/manual-trigger-test-poc.yml @@ -0,0 +1,29 @@ +name: Manual test run (PoC) + +on: + workflow_dispatch: + +jobs: + pytest: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Install project + test deps + run: | + pip install -r requirements-dev.txt + pip install -e . + + - name: Run pytest (exclude Databricks tests) + run: | + pytest -m "not databricks" -v \ No newline at end of file diff --git a/.gitignore b/.gitignore index 840b1da..37b748f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,47 +1,47 @@ -# Couldnt find an official gitignore this is AI generated -# ----------------------------- -# Databricks / DAB / dbx -# ----------------------------- -.databricks/ # local workspace metadata / CLI files -.deploy/ # local deploy cache (dbx/DAB) -.bundle/ # local bundle files (dbx/DAB) -*.log # temporary logs -*.tmp # temporary files -dbx_project.yaml.bak # backup of bundle config -build/ -dist/ - -# ----------------------------- -# Python -# ----------------------------- -__pycache__/ -*.pyc -*.pyo -*.pyd -*.egg-info/ -.venv/ -env/ -pip-selfcheck.json - -# ----------------------------- -# Jupyter Notebooks -# ----------------------------- -.ipynb_checkpoints/ - -# ----------------------------- -# Scratch / experimental folder -# ----------------------------- -scratch/** # ignore all files in scratch -!scratch/README.md # except placeholder README.md - -# ----------------------------- -# IDE / editor -# ----------------------------- -.vscode/ -.idea/ - -# ----------------------------- -# OS / system -# ----------------------------- -.DS_Store +# Couldnt find an official gitignore this is AI generated +# ----------------------------- +# Databricks / DAB / dbx +# ----------------------------- +.databricks/ # local workspace metadata / CLI files +.deploy/ # local deploy cache (dbx/DAB) +.bundle/ # local bundle files (dbx/DAB) +*.log # temporary logs +*.tmp # temporary files +dbx_project.yaml.bak # backup of bundle config +build/ +dist/ + +# ----------------------------- +# Python +# ----------------------------- +__pycache__/ +*.pyc +*.pyo +*.pyd +*.egg-info/ +.venv/ +env/ +pip-selfcheck.json + +# ----------------------------- +# Jupyter Notebooks +# ----------------------------- +.ipynb_checkpoints/ + +# ----------------------------- +# Scratch / experimental folder +# ----------------------------- +scratch/** # ignore all files in scratch +!scratch/README.md # except placeholder README.md + +# ----------------------------- +# IDE / editor +# ----------------------------- +.vscode/ +.idea/ + +# ----------------------------- +# OS / system +# ----------------------------- +.DS_Store Thumbs.db \ No newline at end of file diff --git a/conftest.py-comebackto.txt b/conftest.py-comebackto.txt new file mode 100644 index 0000000..fb99be0 --- /dev/null +++ b/conftest.py-comebackto.txt @@ -0,0 +1,99 @@ +# copy paste from [Dab repo examples](https://github.com/databricks/bundle-examples/blob/1cf3dba30a897d68e3e74ab17f0a3dff68392f15/default_python/tests/conftest.py) +"""This file configures pytest. + +This file is in the root since it can be used for tests in any place in this +project, including tests under resources/. +""" + +import os, sys, pathlib +from contextlib import contextmanager + + +try: + from databricks.connect import DatabricksSession + from databricks.sdk import WorkspaceClient + from pyspark.sql import SparkSession + import pytest + import json + import csv + import os +except ImportError: + raise ImportError( + "Test dependencies not found.\n\nRun tests using 'uv run pytest'. See http://docs.astral.sh/uv to learn more about uv." + ) + + +@pytest.fixture() +def spark() -> SparkSession: + """Provide a SparkSession fixture for tests. + + Minimal example: + def test_uses_spark(spark): + df = spark.createDataFrame([(1,)], ["x"]) + assert df.count() == 1 + """ + return DatabricksSession.builder.getOrCreate() + + +@pytest.fixture() +def load_fixture(spark: SparkSession): + """Provide a callable to load JSON or CSV from fixtures/ directory. + + Example usage: + + def test_using_fixture(load_fixture): + data = load_fixture("my_data.json") + assert data.count() >= 1 + """ + + def _loader(filename: str): + path = pathlib.Path(__file__).parent.parent / "fixtures" / filename + suffix = path.suffix.lower() + if suffix == ".json": + rows = json.loads(path.read_text()) + return spark.createDataFrame(rows) + if suffix == ".csv": + with path.open(newline="") as f: + rows = list(csv.DictReader(f)) + return spark.createDataFrame(rows) + raise ValueError(f"Unsupported fixture type for: {filename}") + + return _loader + + +def _enable_fallback_compute(): + """Enable serverless compute if no compute is specified.""" + conf = WorkspaceClient().config + if conf.serverless_compute_id or conf.cluster_id or os.environ.get("SPARK_REMOTE"): + return + + url = "https://docs.databricks.com/dev-tools/databricks-connect/cluster-config" + print("โ˜๏ธ no compute specified, falling back to serverless compute", file=sys.stderr) + print(f" see {url} for manual configuration", file=sys.stdout) + + os.environ["DATABRICKS_SERVERLESS_COMPUTE_ID"] = "auto" + + +@contextmanager +def _allow_stderr_output(config: pytest.Config): + """Temporarily disable pytest output capture.""" + capman = config.pluginmanager.get_plugin("capturemanager") + if capman: + with capman.global_and_fixture_disabled(): + yield + else: + yield + + +def pytest_configure(config: pytest.Config): + """Configure pytest session.""" + with _allow_stderr_output(config): + _enable_fallback_compute() + + # Initialize Spark session eagerly, so it is available even when + # SparkSession.builder.getOrCreate() is used. For DB Connect 15+, + # we validate version compatibility with the remote cluster. + if hasattr(DatabricksSession.builder, "validateSession"): + DatabricksSession.builder.validateSession().getOrCreate() + else: + DatabricksSession.builder.getOrCreate() \ No newline at end of file diff --git a/conftest.py-disablefornow b/conftest.py-disablefornow new file mode 100644 index 0000000..fb99be0 --- /dev/null +++ b/conftest.py-disablefornow @@ -0,0 +1,99 @@ +# copy paste from [Dab repo examples](https://github.com/databricks/bundle-examples/blob/1cf3dba30a897d68e3e74ab17f0a3dff68392f15/default_python/tests/conftest.py) +"""This file configures pytest. + +This file is in the root since it can be used for tests in any place in this +project, including tests under resources/. +""" + +import os, sys, pathlib +from contextlib import contextmanager + + +try: + from databricks.connect import DatabricksSession + from databricks.sdk import WorkspaceClient + from pyspark.sql import SparkSession + import pytest + import json + import csv + import os +except ImportError: + raise ImportError( + "Test dependencies not found.\n\nRun tests using 'uv run pytest'. See http://docs.astral.sh/uv to learn more about uv." + ) + + +@pytest.fixture() +def spark() -> SparkSession: + """Provide a SparkSession fixture for tests. + + Minimal example: + def test_uses_spark(spark): + df = spark.createDataFrame([(1,)], ["x"]) + assert df.count() == 1 + """ + return DatabricksSession.builder.getOrCreate() + + +@pytest.fixture() +def load_fixture(spark: SparkSession): + """Provide a callable to load JSON or CSV from fixtures/ directory. + + Example usage: + + def test_using_fixture(load_fixture): + data = load_fixture("my_data.json") + assert data.count() >= 1 + """ + + def _loader(filename: str): + path = pathlib.Path(__file__).parent.parent / "fixtures" / filename + suffix = path.suffix.lower() + if suffix == ".json": + rows = json.loads(path.read_text()) + return spark.createDataFrame(rows) + if suffix == ".csv": + with path.open(newline="") as f: + rows = list(csv.DictReader(f)) + return spark.createDataFrame(rows) + raise ValueError(f"Unsupported fixture type for: {filename}") + + return _loader + + +def _enable_fallback_compute(): + """Enable serverless compute if no compute is specified.""" + conf = WorkspaceClient().config + if conf.serverless_compute_id or conf.cluster_id or os.environ.get("SPARK_REMOTE"): + return + + url = "https://docs.databricks.com/dev-tools/databricks-connect/cluster-config" + print("โ˜๏ธ no compute specified, falling back to serverless compute", file=sys.stderr) + print(f" see {url} for manual configuration", file=sys.stdout) + + os.environ["DATABRICKS_SERVERLESS_COMPUTE_ID"] = "auto" + + +@contextmanager +def _allow_stderr_output(config: pytest.Config): + """Temporarily disable pytest output capture.""" + capman = config.pluginmanager.get_plugin("capturemanager") + if capman: + with capman.global_and_fixture_disabled(): + yield + else: + yield + + +def pytest_configure(config: pytest.Config): + """Configure pytest session.""" + with _allow_stderr_output(config): + _enable_fallback_compute() + + # Initialize Spark session eagerly, so it is available even when + # SparkSession.builder.getOrCreate() is used. For DB Connect 15+, + # we validate version compatibility with the remote cluster. + if hasattr(DatabricksSession.builder, "validateSession"): + DatabricksSession.builder.validateSession().getOrCreate() + else: + DatabricksSession.builder.getOrCreate() \ No newline at end of file diff --git a/databricks.yml b/databricks.yml index 03ccce6..de31443 100644 --- a/databricks.yml +++ b/databricks.yml @@ -84,11 +84,11 @@ variables: prod_env_sp_id: default: "my-sp-id-jfsdkjhfjsdhfkjh" -# qqqq will want later if many python files +# for external package maybe qqqq i dont think we will export there are poetry bundle example repos if we decide to # artifacts: - # python_artifact: - # type: whl - # build: uv build --wheel +# python_artifact: +# type: whl +# build: uv build --wheel # Deployment environments targets: diff --git a/devops/README.md b/devops/README.md new file mode 100644 index 0000000..b4ee10e --- /dev/null +++ b/devops/README.md @@ -0,0 +1,5 @@ +# Development Deployment + +It would be nice without the terminal and without needing to push to github to trigger unit tests, bundle validation, and bundle deployment for the local development user areas. + +This doesnt seem do-able with a notebook, and enabling the terminal is an option, so using the databrick.yml ui deploy, and remembering to triggered any unit tests seems like it will be the process for now. \ No newline at end of file diff --git a/notebooks/notebook_better_secrets.ipynb b/notebooks/notebook_better_secrets.ipynb new file mode 100644 index 0000000..757e515 --- /dev/null +++ b/notebooks/notebook_better_secrets.ipynb @@ -0,0 +1,284 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "2211afe9-d3ee-48ab-b94c-e9cbd4fd7cad", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Notebook advantages\n", + "Has dbutils and spark\n", + "# Issues\n", + "Pathing is awful\n", + " - EOE do have a function to help\n", + " - DBX suggests wheels which is more technical\n", + "Hardcoding things dont want to so need to add them to some secrets\n", + "Spark may not run in pytest but is running here\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bdf58cb9-497d-41ea-a320-4d427cd51727", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import sys, os\n", + "# we need better routing\n", + "# toml providing it - only for test and once loaded will work until session refresh\n", + "PROJECT_ROOT = \"/Workspace/Users/philip.tate@nhs.net/PT Separate Feature Branch/src\"\n", + "sys.path.insert(0, PROJECT_ROOT)\n", + "\n", + "print(\"Project root:\", PROJECT_ROOT)\n", + "print(\"Contents:\", os.listdir(PROJECT_ROOT))\n", + "\n", + "from utils import load_csv_table\n", + "\n", + "\n", + "# Example configuration\n", + "layer = \"bronze\"\n", + "domain = \"ods\"\n", + "# had to do via command line locally need azure key vault access\n", + "storage_account_url = dbutils.secrets.get(scope=\"poc-secrets\", key=\"storage_account_url\")\n", + "base_path = f\"abfss://{layer}{storage_account_url}/{domain}/\"\n", + "csv_filename = \"Contact_Details.csv\"\n", + "\n", + "# Load the CSV\n", + "#df = spark.read.option('header', 'true').csv(os.path.join(base_path, csv_filename))\n", + "df = load_csv_table(spark, base_path, csv_filename)\n", + "# Preview\n", + "df.show(5)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5d458372-8f7e-4730-b1cc-758c3e50a499", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Just better secret storage\n", + " - # Example configuration\n", + "layer = \"bronze\"\n", + "domain = \"ods\"\n", + "# had to do via command line locally need azure key vault access\n", + "storage_account_url = dbutils.secrets.get(scope=\"poc-secrets\", key=\"storage_account_url\")\n", + "base_path = f\"abfss://{layer}{storage_account_url}/{domain}/\"\n", + "csv_filename = \"Contact_Details.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a98cac63-2105-4549-918d-1392fae31c97", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# List all secret scopes available in your workspace\n", + "# scopes = dbutils.secrets.listScopes()\n", + "# for scope in scopes:\n", + "# print(scope.name)\n", + "\n", + "# keys = dbutils.secrets.list(\"UnifiedReportingDevKeyVault\")\n", + "# for key in keys:\n", + "# print(key.key)\n", + "\n", + "keys = dbutils.secrets.list(\"poc-secrets\")\n", + "for key in keys:\n", + " print(key.key)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "bde3d03b-f58b-45e9-8b7d-b366023c2ce7", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Test logic\n", + "First we need to just know can compare against a DF before we worry about test detection\n", + "\n", + "Then we need pytest" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3dcd99c3-da1a-42c1-b65d-a4ab4e442205", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "\"\"\"Simple test for load_csv_table - Databricks UI version\"\"\"\n", + "\n", + "from pyspark.sql import SparkSession\n", + "import sys\n", + "\n", + "# Routing\n", + "PROJECT_ROOT = \"/Workspace/Users/philip.tate@nhs.net/PT Separate Feature Branch/src\"\n", + "sys.path.insert(0, PROJECT_ROOT)\n", + "\n", + "from utils import load_csv_table\n", + "\n", + "\n", + "def test_load_csv_table():\n", + " \"\"\"Test that load_csv_table loads CSV correctly\"\"\"\n", + " \n", + " print(\"๐Ÿงช Starting test...\")\n", + " \n", + " # Get Spark session\n", + " spark = SparkSession.getActiveSession()\n", + " \n", + " # Create test data\n", + " data = [\n", + " (\"ORG001\", \"Test Hospital\", \"Active\"),\n", + " (\"ORG002\", \"Test Clinic\", \"Inactive\"),\n", + " (\"ORG003\", \"Test Surgery\", \"Active\")\n", + " ]\n", + " columns = [\"OrganisationID\", \"Name\", \"Status\"]\n", + " \n", + " test_df = spark.createDataFrame(data, columns)\n", + " \n", + " # Use DBFS temp location (works better in Databricks UI)\n", + " import random\n", + " test_id = random.randint(10000, 99999)\n", + " temp_path = f\"/tmp/test_csv_{test_id}\"\n", + " \n", + " print(f\"๐Ÿ“ Writing test data to: {temp_path}\")\n", + " \n", + " try:\n", + " # Write test data\n", + " test_df.coalesce(1).write.csv(\n", + " temp_path,\n", + " header=True,\n", + " mode=\"overwrite\"\n", + " )\n", + " \n", + " # Find the actual CSV file (Spark creates part-*.csv)\n", + " files = dbutils.fs.ls(temp_path)\n", + " csv_file = [f for f in files if f.name.startswith(\"part-\") and f.name.endswith(\".csv\")][0]\n", + " \n", + " # Rename to expected filename\n", + " new_path = f\"{temp_path}/Contact_Details.csv\"\n", + " dbutils.fs.mv(csv_file.path, new_path)\n", + " \n", + " # TEST: Load the CSV\n", + " base_path = temp_path + \"/\"\n", + " csv_filename = \"Contact_Details.csv\"\n", + " \n", + " print(f\"๐Ÿ“‚ Loading from: {base_path}{csv_filename}\")\n", + " df = load_csv_table(spark, base_path, csv_filename)\n", + " \n", + " # Verify results\n", + " print(f\"โœ“ DataFrame created: {df is not None}\")\n", + " print(f\"โœ“ Row count: {df.count()} (expected 3)\")\n", + " print(f\"โœ“ Columns: {df.columns}\")\n", + " \n", + " rows = df.collect()\n", + " print(f\"โœ“ First row data: {rows[0]['OrganisationID']}, {rows[0]['Name']}, {rows[0]['Status']}\")\n", + " \n", + " # Show the data\n", + " print(\"\\n๐Ÿ“Š Loaded data:\")\n", + " df.show()\n", + " \n", + " # Assertions\n", + " assert df is not None, \"โŒ DataFrame is None\"\n", + " assert df.count() == 3, f\"โŒ Expected 3 rows, got {df.count()}\"\n", + " assert \"OrganisationID\" in df.columns, \"โŒ Missing OrganisationID column\"\n", + " assert \"Name\" in df.columns, \"โŒ Missing Name column\"\n", + " assert \"Status\" in df.columns, \"โŒ Missing Status column\"\n", + " assert rows[0][\"OrganisationID\"] == \"ORG001\", \"โŒ Wrong data in first row\"\n", + " \n", + " print(\"\\nโœ… ALL TESTS PASSED!\")\n", + " \n", + " return df\n", + " \n", + " finally:\n", + " # Clean up test data\n", + " try:\n", + " print(f\"\\n๐Ÿงน Cleaning up: {temp_path}\")\n", + " dbutils.fs.rm(temp_path, recurse=True)\n", + " print(\"โœ“ Cleanup complete\")\n", + " except Exception as e:\n", + " print(f\"โš ๏ธ Cleanup failed (not critical): {e}\")\n", + "\n", + "\n", + "# ๐Ÿš€ RUN THE TEST NOW\n", + "test_load_csv_table()" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "notebook_better_secrets", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/notebooks/run_working_days_example.ipynb b/notebooks/run_working_days_example.ipynb new file mode 100644 index 0000000..9c2422e --- /dev/null +++ b/notebooks/run_working_days_example.ipynb @@ -0,0 +1,75 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "39ad4a5b-480a-43a9-a61b-22286ded55a5", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# This file is just to see another databricks projects function being used because i dont have the access to the global file the function has an added bit for making the file on the fly\n", + "\n", + "\n", + "# The only \"setup\" line you should need\n", + "import sys, os\n", + "# we need better routing\n", + "# toml providing it - only for test and once loaded will work until session refresh\n", + "PROJECT_ROOT = \"/Workspace/Users/philip.tate@nhs.net/PT Separate Feature Branch/src\"\n", + "sys.path.insert(0, PROJECT_ROOT)\n", + "\n", + "from transformations import working_days_monthly\n", + "\n", + "# COMMAND ----------\n", + "# 2. Setup some dummy data to see if it works\n", + "data = [(\"2023-01-01\",), (\"2023-02-01\",)]\n", + "columns = [\"start_date\"]\n", + "sample_df = spark.createDataFrame(data, columns)\n", + "col_start_date = \"start_date\"\n", + "\n", + "\n", + "# COMMAND ----------\n", + "# 4. Run the function\n", + "# Notice we pass 'spark' (the global session) into the function\n", + "output_df = working_days_monthly(spark, sample_df, col_start_date)\n", + "\n", + "# COMMAND ----------\n", + "# 5. View the result\n", + "display(output_df)\n", + "\n" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "run_working_days_example", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/notebooks/unit-tests-del/utils/run_tests.py-worked b/notebooks/unit-tests-del/utils/run_tests.py-worked new file mode 100644 index 0000000..78f81f1 --- /dev/null +++ b/notebooks/unit-tests-del/utils/run_tests.py-worked @@ -0,0 +1,14 @@ + +import sys +import pytest + +# Prevent __pycache__ creation +sys.dont_write_bytecode = True + +# Run your test file +pytest.main([ + "test_loader_5.py", # your test file + "-v", # verbose + "-s", # show print output + "--tb=short" # short tracebacks +]) diff --git a/notebooks/unit-tests-del/utils/test_loader_5.py-worked b/notebooks/unit-tests-del/utils/test_loader_5.py-worked new file mode 100644 index 0000000..221f8d4 --- /dev/null +++ b/notebooks/unit-tests-del/utils/test_loader_5.py-worked @@ -0,0 +1,268 @@ +import sys, os +import shutil +import random +import pytest +from pathlib import Path +from pyspark.sql import SparkSession +import random +from pyspark.dbutils import DBUtils + + +############################### +## This is more integration now anyway i need a core function test for unit +## Spark and dbutils are not going to work in git +## Routing wont work in git actions +## Need to move things into testing config +## need to move thing into toml +## file needs to be in correct location +############################## + +# we need better routing +PROJECT_ROOT = "/Workspace/Users/philip.tate@nhs.net/PT Separate Feature Branch/src" +sys.path.insert(0, PROJECT_ROOT) + +from utils import load_csv_table + + +###################################### +### PyTest doesnt have access to spark and dbutils in the same way so we are checking the test setup here ### +###################################### + +# Use existing Spark session in Databricks (Spark Connect) +@pytest.fixture(scope="session") +def spark(): + session = SparkSession.getActiveSession() + if session is None: + raise RuntimeError("No active Spark session found. Ensure you are running in Databricks.") + return session + +def test_load_csv_table_function_using_dbfs(spark): + """ + Test loading CSV from DBFS - THIS SHOULD WORK + """ + dbutils = DBUtils(spark) + + # Create test data + test_data = [ + ("ORG001", "Test Hospital", "Active"), + ("ORG002", "Test Clinic", "Inactive"), + ("ORG003", "Test Surgery", "Active") + ] + columns = ["OrganisationID", "Name", "Status"] + test_df = spark.createDataFrame(test_data, columns) + + # Write to DBFS + test_id = random.randint(10000, 99999) + dbfs_path = f"/tmp/test_dbfs_{test_id}" + filename = "Contact_Details.csv" + + # Write using Spark (creates directory with part files) + test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(dbfs_path) + + print(f"โœ“ Wrote to DBFS: {dbfs_path}") + + # Setup paths + test_id = random.randint(10000, 99999) + dbfs_folder = f"/tmp/test_dbfs_{test_id}" + filename = "Contact_Details.csv" + + # STEP 1: Write to temporary location (Spark creates part files) + temp_write_path = f"{dbfs_folder}/temp_write" + test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_write_path) + + print(f"โœ“ Wrote to temp location: {temp_write_path}") + + # STEP 2: Find the actual CSV part file that was created + files = dbutils.fs.ls(temp_write_path) + csv_part_file = [f.path for f in files if f.path.endswith('.csv')][0] + + print(f"โœ“ Found part file: {csv_part_file}") + + # STEP 3: Copy it to a proper filename + final_csv_path = f"{dbfs_folder}/{filename}" + dbutils.fs.cp(csv_part_file, final_csv_path) + + print(f"โœ“ Copied to proper filename: {final_csv_path}") + + # Test our function with DBFS path + base_path = f"dbfs:{dbfs_folder}/" + df = load_csv_table(spark, base_path, filename) + # worked but its using partials files in a folder directory - i want to test with an actual full file + # df = load_csv_table_copy(spark, base_path, "") # Empty filename since Spark reads the whole directory + + print(f"โœ“ Loaded using: load_csv_table(spark, '{base_path}', '{filename}')") + df.show() + + # Assertions + assert df.count() == 3, f"Expected 3 rows, got {df.count()}" + assert "OrganisationID" in df.columns + assert "Name" in df.columns + assert "Status" in df.columns + + first_row = df.collect()[0] + assert first_row["OrganisationID"] == "ORG001" + + # Cleanup + dbutils.fs.rm(dbfs_path, recurse=True) + print("โœ… DBFS test PASSED!") + +####################################################### +###################################################### +## Junk because poc but very small chance useful for what tried if issues soon +######################################################## +#### not accessible by spark +# Fixture to create a temporary CSV for testing +# @pytest.fixture +# def mock_csv_data(spark, tmp_path): +# # Sample data +# data = [ +# ("ORG001", "Test Hospital", "Active"), +# ("ORG002", "Test Clinic", "Inactive"), +# ("ORG003", "Test Surgery", "Active") +# ] +# columns = ["OrganisationID", "Name", "Status"] +# test_df = spark.createDataFrame(data, columns) + +# # Unique temp folder +# test_id = random.randint(10000, 99999) +# temp_path = tmp_path / f"test_csv_{test_id}" +# temp_path.mkdir(parents=True, exist_ok=True) + +# # Databricks-safe CSV write: convert to Pandas and write +# csv_path = temp_path / "Contact_Details.csv" +# test_df.toPandas().to_csv(csv_path, index=False) + +# yield str(temp_path), "Contact_Details.csv" + +# # Cleanup +# shutil.rmtree(temp_path, ignore_errors=True) + +# # Example test using the fixture +# def test_csv_exists(mock_csv_data): +# folder, filename = mock_csv_data +# full_path = os.path.join(folder, filename) +# print(f"Checking CSV file exists at: {full_path}") +# assert os.path.exists(full_path) + +# Example using dbutils worked +# def test_dbutils_write_and_read_simple(spark): +# """ +# Simple test: Can we write a CSV to DBFS and read it back with Spark? +# This proves the core concept works. +# """ +# from pyspark.dbutils import DBUtils +# dbutils = DBUtils(spark) + +# # 0. CLEAN UP ANY STALE DATA FIRST +# test_path = "/tmp/simple_test_12345" +# try: +# dbutils.fs.rm(test_path, recurse=True) +# print(f"โœ“ Cleaned up stale data at: {test_path}") +# except: +# print(f"โœ“ No stale data to clean") + +# # 1. Create test data +# test_data = [("A", 1), ("B", 2), ("C", 3)] +# test_df = spark.createDataFrame(test_data, ["letter", "number"]) + +# # 2. Write to DBFS using Spark +# test_path = "/tmp/simple_test_12345" +# test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(test_path) + +# print(f"โœ“ Wrote CSV to: {test_path}") + +# # 3. Check it exists using dbutils +# files = dbutils.fs.ls(test_path) +# print(f"โœ“ Files in DBFS: {[f.name for f in files]}") + +# # 4. Read it back using Spark +# df_read = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"dbfs:{test_path}") + +# print(f"โœ“ Read back {df_read.count()} rows") +# df_read.show() + +# # 5. Verify data +# assert df_read.count() == 3 +# assert "letter" in df_read.columns +# assert "number" in df_read.columns + +# # 6. Cleanup +# dbutils.fs.rm(test_path, recurse=True) +# print(f"โœ“ Cleaned up: {test_path}") + +# print("โœ… SUCCESS: dbutils works in .py file, can write and read from DBFS!") + +#################################################################### +#### Using our test CSV test the loader function +######################################################## +################################################# +## Hardcode in function for now instead of reference +################################################### +## This is what we want to use ultimately +# df = load_csv_table(spark, folder + "/", filename) +# def load_csv_table_copy(spark, base_path, csv_filename): +# """Load CSV from Azure storage with standard options + +# Args: +# base_path: Base path to the folder containing CSV files +# csv_filename: Name of the CSV file to load + +# Returns: +# DataFrame: Spark DataFrame with CSV data +# """ + +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load(f"{base_path}{csv_filename}") +# ) + +# # Actual test using the fixture +# def test_load_csv_table_function(spark, mock_csv_data): +# folder, filename = mock_csv_data + +# print("Folder path:", folder) +# print("Filename:", filename) +# full_path = folder + "/" + filename +# print("Full CSV path passed to loader:", full_path) +# print("Does file exist?", os.path.exists(full_path)) + +# # Load using your function + + +# df = load_csv_table_copy(spark, folder + "/", filename) + + +# # Print DataFrame schema and first few rows +# print("DataFrame schema:") +# df.printSchema() +# print("First 5 rows:") +# df.show(5) + +# # Assertions +# assert df is not None, "DataFrame should not be None" +# count = df.count() +# print("Row count:", count) +# assert count == 3, f"Expected 3 rows, got {count}" +# assert "OrganisationID" in df.columns +# assert "Name" in df.columns +# assert "Status" in df.columns + +# # Check first row +# first_row = df.collect()[0] +# print("First row:", first_row.asDict()) +# assert first_row["OrganisationID"] == "ORG001" +# assert first_row["Name"] == "Test Hospital" +# assert first_row["Status"] == "Active" + + +# %pip install -e "/Workspace/Repos/philip.tate@nhs.net/PT Separate Feature Branch" +# %pip install pytest>=7.0 +# !python -m pytest # tests/unit-tests/ -v -p no:cacheprovider +# import os +# os.environ['PYTHONDONTWRITEBYTECODE'] = '1' +# import pytest +# tests_path = "/Workspace/Repos/philip.tate@nhs.net/PT Separate Feature Branch/tests/unit-tests/" +# !python -m pytest tests/unit-tests/ -v -p no:cacheprovider +# pytest.main([tests_path, "-v", "-p", "no:cacheprovider"]) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1707b8a..ad75182 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,32 +1,52 @@ -[project] -name = "Workflow_POC" -version = "0.0.1" -authors = [{ name = "philip.tate@nhs.net" }] -requires-python = ">=3.10,<=3.13" -dependencies = [ - # Any dependencies for jobs and pipelines in this project can be added here - # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies - # - # LIMITATION: for pipelines, dependencies are cached during development; - # add dependencies to the 'environment' section of your pipeline.yml file instead -] - -[dependency-groups] -dev = [ - "pytest", - "databricks-dlt", - "databricks-connect>=15.4,<15.5", -] - -[project.scripts] -main = "Workflow_POC.main:main" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src"] - -[tool.black] -line-length = 125 +[project] +name = "Workflow_POC" +version = "0.0.1" +authors = [{ name = "philip.tate@nhs.net" }] +description = "POC" +readme = "README.md" +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of your pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "pytest-mock", # Very useful for mocking dbutils later + "databricks-dlt", + "databricks-connect>=15.4,<15.5", +] + +[project.scripts] +main = "Workflow_POC.main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +# Wheel packing came in the template :) qqqq +[tool.hatch.build.targets.wheel] +packages = ["src"] + +[tool.black] +line-length = 125 + +[tool.pytest.ini_options] +minversion = "6.0" +# This tells pytest to look in 'tests' by default +testpaths = ["tests"] +# This is the "Magic Fix" for routing. It adds 'src' to the path automatically! +pythonpath = ["src"] +python_files = ["test_*.py"] +addopts = [ + "-v", + "-s", + "--tb=short" +] +markers = [ + "databricks: for example we might use this marker to exclude a specific unit test from git env test running so it only run in dbx env", +] \ No newline at end of file diff --git a/pytest.ini.-use toml instead.txt b/pytest.ini.-use toml instead.txt new file mode 100644 index 0000000..78b40db --- /dev/null +++ b/pytest.ini.-use toml instead.txt @@ -0,0 +1,30 @@ +# qqqq todo ai generated ask someone to scan over it and give a read of the ins and outs +[pytest] +# Pytest configuration for Databricks unit tests + +# Test discovery patterns +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Test paths +testpaths = tests + +# Output options +addopts = + -v + --tb=short + --strict-markers + --disable-warnings + +# Markers for organizing tests +markers = + unit: Unit tests that don't require external resources + integration: Integration tests that may require external systems + slow: Tests that take a long time to run + +# Minimum Python version +minversion = 3.8 + +# Directory patterns to ignore +norecursedirs = .git .tox dist build *.egg .venv venv \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..55b033e --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +pytest \ No newline at end of file diff --git a/resources/pipeline/ods_ingestion.yml b/resources/pipeline/ods_ingestion.yml index 2685775..8a2ffb1 100644 --- a/resources/pipeline/ods_ingestion.yml +++ b/resources/pipeline/ods_ingestion.yml @@ -1,86 +1,99 @@ -############################### -## POC notes - DELETE LATER -############################### -## We should think about these resource files I think potentially a .yml per layer bronze.yml may make sense -## We will not define schemas here -## We use this file to expose from databricks.yml the variables we need to set up the pipeline -## We will define too variables just for the set of pipelines here too if we start running layer based .ymls then we would have layer level variables here -############################### -## If we want specific pipeline resource file per .py file we should use this i think - # libraries: - # - notebook: - # path: ../../src/ingestion/ods_ingest.py -## if we want per layer maybe - # libraries: - # - glob: - # # if doing a pipeline per layer would do something like - # include: ../../src/ingestion/**.py -## if we want per domain maybe - # libraries: - # - glob: - # # if doing a pipeline per layer would do something like - # include: ../../src/ingestion/ods_*.py -############################### - -# qqqq discus where want these things to live if it was using a wheel then the python file could be literally a table and a foreach -##### -# If we are running multlipe pipelines we may define all their vars at the top -##### - - -# qqqq -## im thinking var for in script var <-- also no because i cand get bundle.xyz and no all vars seem accessible everywhere i get catalog from databricks.yml -## bundle for vars originating from databricks.ymly -### i get vars from databricks -## pipeline. from pipeline files -## but files run, it shouldnt be bundle and pipeline it should represent the scope they come from - -## qqqq i like the top level config value to pass i do not like construction vars in a yml instead of python but -# Error: cannot create pipeline: The target schema field is required for UC pipelines. Reason: DLT requires specifying a target schema for UC pipelines. Please use the TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING TABLE statement if you do not wish to publish your dataset.. -# Error: cannot update pipeline: Specified 'schema' field in the pipeline settings is illegal. Reason: Cannot unset 'schema' field once it's defined in the pipeline spec. Please create a new DLT pipeline. For more information about publishing modes, see https://docs.databricks.com/en/dlt/migrate-to-dpm.html. -variables: - layer: - default: bronze - description: bronze, silver, transfrormations etc - - -x-bronze-config: &bronze-config - bundle.env_name: ${var.env_name} - bundle.storage_account: ${var.storage_account} #storage is environment specific so defined in databricks.yml - pipeline.layer: ${var.layer} # if we are doing layer based resource files qqqq get from var - # f"{ADLS_PROTOCOL}{container}@{storage_account}{ADLS_SUFFIX}/ -> py adds {folder_name}/" - pipeline.storage_container_path: "abfss://${var.layer}@${var.storage_account}.dfs.core.windows.net/" - -resources: - pipelines: - pipeline_ods_ingestion: - name: ods_ingestion - libraries: - - glob: - # if doing a pipeline per layer would do something like - # include: ../../src/ingestion/ - might work - # include: ../../src/ingestion/*.py - doesnt work - include: ../../src/ingestion/ods_ingest.py - photon: true - # qqqq good practice to specify its something to do with dlt having beta version? - channel: current - # By defining catalog here we set it for all jobs in the pipeline without needing to specify it witht he variable when defining a table - catalog: ${var.catalog} - target: ${var.schema_prefix}${var.layer}_ods ## AI said missing this qqqq i dont want this hard coded here - serverless: true - # qqqq dont think i need this here DELETE root_path: ../../src/ingestion - # qqqq config is only at pipeline level use yml anchor points if need to reuse - configuration: - ################ Map Databricks Bundle variables to Spark Config Properties ################ - # Map the Bundle variables (from databricks.yml) to Spark config properties - # The key names here MUST match what you use in spark.conf.get() in Python! - # bundle.env_name: ${var.env_name} - # bundle.schema_prefix: ${var.schema_prefix} - qqqq setting schema now in the yml - # bundle.storage_account: ${var.storage_account} - ############### Resource yml files for set of pipelines ################# - # If we do bronze, silver ... tranformation based layers with own yml files will define layer level vars here - # for example this would be - # bundle.layer_name: bronze -> #schema_layer = "bronze_" -> # schema_layer = park.conf.get("bundle.layer_name") - # configuration: - <<: *bronze-config #config anchor point for bronze layer so all pipelines in this file will have this set of configs - pipeline.domain: ods # if we then want to apply per pipeline variable here \ No newline at end of file +############################### +## POC notes - DELETE LATER +############################### +## We should think about these resource files I think potentially a .yml per layer bronze.yml may make sense +## We will not define schemas here +## We use this file to expose from databricks.yml the variables we need to set up the pipeline +## We will define too variables just for the set of pipelines here too if we start running layer based .ymls then we would have layer level variables here +############################### +## If we want specific pipeline resource file per .py file we should use this i think + # libraries: + # - notebook: + # path: ../../src/ingestion/ods_ingest.py +## if we want per layer maybe + # libraries: + # - glob: + # # if doing a pipeline per layer would do something like + # include: ../../src/ingestion/**.py +## if we want per domain maybe + # libraries: + # - glob: + # # if doing a pipeline per layer would do something like + # include: ../../src/ingestion/ods_*.py +############################### + +# qqqq discus where want these things to live if it was using a wheel then the python file could be literally a table and a foreach +##### +# If we are running multlipe pipelines we may define all their vars at the top +##### + + +# qqqq +## im thinking var for in script var <-- also no because i cand get bundle.xyz and no all vars seem accessible everywhere i get catalog from databricks.yml +## bundle for vars originating from databricks.ymly +### i get vars from databricks +## pipeline. from pipeline files +## but files run, it shouldnt be bundle and pipeline it should represent the scope they come from + +## qqqq i like the top level config value to pass i do not like construction vars in a yml instead of python but +# Error: cannot create pipeline: The target schema field is required for UC pipelines. Reason: DLT requires specifying a target schema for UC pipelines. Please use the TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING TABLE statement if you do not wish to publish your dataset.. +# Error: cannot update pipeline: Specified 'schema' field in the pipeline settings is illegal. Reason: Cannot unset 'schema' field once it's defined in the pipeline spec. Please create a new DLT pipeline. For more information about publishing modes, see https://docs.databricks.com/en/dlt/migrate-to-dpm.html. +variables: + layer: + default: bronze + description: bronze, silver, transfrormations etc + domain: + default: ods + description: pipeline per domain I expect + + +x-bronze-config: &bronze-config + bundle.env_name: ${var.env_name} + bundle.storage_account: ${var.storage_account} #storage is environment specific so defined in databricks.yml + pipeline.layer: ${var.layer} # if we are doing layer based resource files qqqq get from var + # f"{ADLS_PROTOCOL}{container}@{storage_account}{ADLS_SUFFIX}/ -> py adds {folder_name}/" + pipeline.storage_container_path: "abfss://${var.layer}@${var.storage_account}.dfs.core.windows.net/" + +resources: + pipelines: + pipeline_ods_ingestion: + name: ods_ingestion + libraries: + ####### it seems without a wheel we cannot provide the other files + # This is your DLT entry point + #- file: + # path: ../../src/ingestion/ods_ingest.py + + # This makes 'import utils' possible. + # - glob: + # include: ../../src/utils/ + - glob: + # if doing a pipeline per layer would do something like + # include: ../../src/ingestion/ - might work + # include: ../../src/ingestion/*.py - doesnt work + include: ../../src/ingestion/ods_ingest.py # -worked + # include: ../../src/**/*.py + #- folder: ../../src/utils + photon: true + # qqqq good practice to specify its something to do with dlt having beta version? + channel: current + # By defining catalog here we set it for all jobs in the pipeline without needing to specify it witht he variable when defining a table + catalog: ${var.catalog} + target: ${var.schema_prefix}${var.layer}_${var.domain} + serverless: true + # qqqq dont think i need this here DELETE root_path: ../../src/ingestion + # qqqq config is only at pipeline level use yml anchor points if need to reuse + configuration: + ################ Map Databricks Bundle variables to Spark Config Properties ################ + # Map the Bundle variables (from databricks.yml) to Spark config properties + # The key names here MUST match what you use in spark.conf.get() in Python! + # bundle.env_name: ${var.env_name} + # bundle.schema_prefix: ${var.schema_prefix} - qqqq setting schema now in the yml + # bundle.storage_account: ${var.storage_account} + ############### Resource yml files for set of pipelines ################# + # If we do bronze, silver ... tranformation based layers with own yml files will define layer level vars here + # for example this would be + # bundle.layer_name: bronze -> #schema_layer = "bronze_" -> # schema_layer = park.conf.get("bundle.layer_name") + # configuration: + <<: *bronze-config #config anchor point for bronze layer so all pipelines in this file will have this set of configs + pipeline.domain: ${var.domain} # if we then want to apply per pipeline variable here \ No newline at end of file diff --git a/notebooks/.gitinclude b/src/__init__.py similarity index 100% rename from notebooks/.gitinclude rename to src/__init__.py diff --git a/src/ingestion/ods_ingest.py b/src/ingestion/ods_ingest.py index 9812f39..cda8339 100644 --- a/src/ingestion/ods_ingest.py +++ b/src/ingestion/ods_ingest.py @@ -1,185 +1,240 @@ -# for making spark tables -# qqqq problem i am having is that we are setting the schema, and dev has schema set as user names -# i want to use the databricks.yml schema name for dev, and for staging and prod i want to set it to bronze_ods in this script -from pyspark import pipelines as dp + +# worked but improving pathing +# unless we set up a wheel artifact this is required to access utils +import sys +import os +sys.path.append(os.path.abspath('..')) -# Fixed System Constants these and some of this stuff should be going in a helper i think -#ADLS_PROTOCOL = "abfss://" -#ADLS_SUFFIX = ".dfs.core.windows.net" - -# 1. Get the Catalog name -# qqqq i dont think i want a default id prefer an error i think -# if set this in pipeline yml we wont need it -#catalog_name = spark.conf.get("bundle.catalog") - -# 2. Get the Schema Prefix (This is what changes between environments) -# In Dev, this will be the username. In Staging/Prod, it will be blank. -#schema_user_prefix = spark.conf.get("bundle.schema_prefix") -# this will often be a medallion layer but in src we also have transformations and ingestion so i think this mirror folders in source would be logical if team agrees qqqq -#schema_layer = "bronze_" -#schema_domain = "ods" #qqqq check what terminiology we want here - -# Construct the final schema name -#schema_name = (schema_user_prefix + schema_layer + schema_domain) -#print(schema_name) -# The container likely should mirror the layer name? -# container_layer ?? qqqq -#container = spark.conf.get("bundle.layer") # layer is bronze silver etc -# This likely should be dev staging prod -# storage_environment ?? qqqq -# wouldnt have default +from pyspark import pipelines as dp +from utils.loaders import load_csv_table + # storage_account = spark.conf.get("bundle.storage_account") # 'unifiedrptdeltalake' storage_container_path = spark.conf.get("pipeline.storage_container_path") -# In our storage our folders maybe should be domain based and if we thing this is manageable as hard rule this variable could be called domain_folder or similar qqqq -# domain_folder ?? qqqq folder_name = spark.conf.get("pipeline.domain") # ods -#folder_location_path = f"{ADLS_PROTOCOL}{container}@{storage_account}{ADLS_SUFFIX}/{folder_name}/" -folder_location_path = f"{storage_container_path }/{folder_name}/" -# "abfss://bronze@unifiedrptdeltalake.dfs.core.windows.net/ods +folder_location_path = f"{storage_container_path}/{folder_name}/" + print(folder_location_path) -@dp.table( - # qqqq was f"{schema_name}.Additional_Attributes_Details" but worked before now need to do it this way???!!! - name="Additional_Attributes_Details", - comment="Import raw Additional_Attributes_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Additional_Attributes_Details.csv" - ) - ) - -@dp.table( - name="Code_System_Details", - comment="Import raw Code_System_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Code_System_Details.csv" - ) - ) - -@dp.table( - name="Contact_Details", - comment="Import raw Contact_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Contact_Details.csv" - ) - ) - -@dp.table( - name="Manifest_Details", - comment="Import raw Manifest_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Manifest_Details.csv" - ) - ) - -@dp.table( - name="Organisation_Details", - comment="Import raw Organisation_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Organisation_Details.csv" - ) - ) - -@dp.table( - name="OtherID_Details", - comment="Import raw OtherID_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}OtherID_Details.csv" - ) - ) - -@dp.table( - name="PrimaryRole_Details", - comment="Import raw PrimaryRole_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}PrimaryRole_Details.csv" - ) - ) - -@dp.table( - name="Relationship_Details", - comment="Import raw Relationship_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Relationship_Details.csv" - ) - ) - -@dp.table( - name="Role_Details", - comment="Import raw Role_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Role_Details.csv" - ) - ) - -@dp.table( - name="Successor_Details", - comment="Import raw Successor_Details" -) -def azure_csv_table(): - return ( - spark.read.format("csv") - .option("header", "true") - .option("inferSchema", "true") - .load( - f"{folder_location_path}Successor_Details.csv" - ) - ) +# List of table names +TABLE_NAMES = [ + "Additional_Attributes_Details", + "Code_System_Details", + "Contact_Details", + "Manifest_Details", + "Organisation_Details", + "OtherID_Details", + "PrimaryRole_Details", + "Relationship_Details", + "Role_Details", + "Successor_Details", +] + + +# Define all ODS tables with their configurations +# tables as keys because unique +# only using dictionary because expect future use to have more to it + +# Generate table configurations dynamically +ODS_TABLES = { + table_name: { + "csv_filename": f"{table_name}.csv", + "comment": f"Import raw {table_name}" + } + for table_name in TABLE_NAMES +} +# ALternatively if need to set the values specifically +# ODS_TABLES = { +# "Additional_Attributes_Details": { +# "csv_filename": "Additional_Attributes_Details.csv", +# "comment": "Import raw Additional_Attributes_Details" +# }, +# "Code_System_Details": { +# "csv_filename": "Code_System_Details.csv", +# "comment": "Import raw Code_System_Details" +# } .... +# + + +# Create DLT tables dynamically +for table_name, config in ODS_TABLES.items(): + # Create a closure to capture the current loop variables + def create_table(name=table_name, cfg=config): + @dp.table(name=name, comment=cfg["comment"]) + def table_loader(): + # spark is defined by databricks environment so may not need to directly define it + return load_csv_table( spark, folder_location_path, cfg["csv_filename"]) + return table_loader + + create_table() + + +####################################################################### +### DEL just keeping because poc +########################################################## + + +# def load_csv_table(table_name): +# """Load CSV from Azure storage with standard options""" +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load(f"{folder_location_path}{table_name}.csv") +# ) + +# # Create DLT tables dynamically +# for table_name, comment in ODS_TABLES: +# # Create a closure to capture the current loop variables +# def create_table(name=table_name, desc=comment): +# @dp.table(name=name, comment=desc) +# def table_loader(): +# return load_csv_table(name) +# return table_loader + +# create_table() + +# @dp.table( +# # qqqq was f"{schema_name}.Additional_Attributes_Details" but worked before now need to do it this way???!!! +# name="Additional_Attributes_Details", +# comment="Import raw Additional_Attributes_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Additional_Attributes_Details.csv" +# ) +# ) + +# @dp.table( +# name="Code_System_Details", +# comment="Import raw Code_System_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Code_System_Details.csv" +# ) +# ) + +# @dp.table( +# name="Contact_Details", +# comment="Import raw Contact_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Contact_Details.csv" +# ) +# ) + +# @dp.table( +# name="Manifest_Details", +# comment="Import raw Manifest_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Manifest_Details.csv" +# ) +# ) + +# @dp.table( +# name="Organisation_Details", +# comment="Import raw Organisation_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Organisation_Details.csv" +# ) +# ) + +# @dp.table( +# name="OtherID_Details", +# comment="Import raw OtherID_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}OtherID_Details.csv" +# ) +# ) + +# @dp.table( +# name="PrimaryRole_Details", +# comment="Import raw PrimaryRole_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}PrimaryRole_Details.csv" +# ) +# ) + +# @dp.table( +# name="Relationship_Details", +# comment="Import raw Relationship_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Relationship_Details.csv" +# ) +# ) + +# @dp.table( +# name="Role_Details", +# comment="Import raw Role_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Role_Details.csv" +# ) +# ) + +# @dp.table( +# name="Successor_Details", +# comment="Import raw Successor_Details" +# ) +# def azure_csv_table(): +# return ( +# spark.read.format("csv") +# .option("header", "true") +# .option("inferSchema", "true") +# .load( +# f"{folder_location_path}Successor_Details.csv" +# ) +# ) ###### Try this diff --git a/src/transformations/__init__.py b/src/transformations/__init__.py new file mode 100644 index 0000000..cb3d90c --- /dev/null +++ b/src/transformations/__init__.py @@ -0,0 +1,2 @@ +from .date_transforms import working_days_monthly +__all__ = ["working_days_monthly"] \ No newline at end of file diff --git a/src/transformations/date_transforms.py b/src/transformations/date_transforms.py new file mode 100644 index 0000000..df566e7 --- /dev/null +++ b/src/transformations/date_transforms.py @@ -0,0 +1,69 @@ +####### THIS CODE IS GENUINCE FROM ANOTHER TEAM IVE JUST MADE SPARK DEPENDENCY INJECTED ######## +import os +from pyspark.sql import functions as fn # Added + +## def working_days_monthly(df, col_start_date): # Original +def working_days_monthly(spark, df, col_start_date): + """ + Description: Adds a column to monthly level data with working days, based on the first day of the month. + + Parameters: + df: the dataframe to perform the transformation on + col_start_date: the column that contains the first date of the month in your dataframe + + Returns: + output: the initial dataframe with the new column added with working days for the month + """ + + ## Added because i dont have the global file + calendar_full = "/tmp/standard_calendar.parquet" + + # 1. RELIABLE CHECK: Does the file exist? + file_exists = False + try: + dbutils.fs.ls(calendar_full) + file_exists = True + except: + file_exists = False + + # 2. GENERATE if missing + if not file_exists: + print(f"Creating missing calendar at {calendar_full}...") + spark.range(0, 365 * 10).select( + fn.expr("date_add('2020-01-01', cast(id as int))").alias("Date") + ).withColumn("Month_Start", fn.trunc("Date", "MM")) \ + .withColumn("Working_Day_Type", fn.when(fn.date_format("Date", "EEEE").isin("Saturday", "Sunday"), "N").otherwise("Y")) \ + .withColumn("Working_Day_Calc", fn.when(fn.col("Working_Day_Type") == "Y", 1).otherwise(0)) \ + .write.mode("overwrite").parquet(calendar_full) + + # 3. READ (This will now definitely work) + cal_df = spark.read.parquet(calendar_full) # This is a global file i dont have + + cal_df = cal_df.filter(fn.col("Working_Day_Type") == "Y").dropDuplicates() + + cal_df = cal_df.withColumn( + "Month_Start", + fn.date_format(fn.to_date(fn.col("Month_Start"), "yyyy-MM-dd"), "yyyy-MM-dd").cast("date") + ) + + # Aggregate by Month_Start and sum Working_Day_Calc + cal_df = cal_df.groupBy( + "Month_Start" + ).agg( + fn.sum("Working_Day_Calc").alias("Total_Working_Days") + ) + + # Convert start date in df to date format for comparison + df = df.withColumn(col_start_date, fn.to_date(fn.col(col_start_date).cast("date"))) + + # Create a new column with the total working days + output = df.join( + cal_df.select("Month_Start", "Total_Working_Days"), + fn.col(col_start_date) == fn.col("Month_Start"), + "left" + ).withColumn( + "working_days", + fn.coalesce(fn.col("Total_Working_Days"), fn.lit(0)) + ).drop("Month_Start", "Total_Working_Days") + + return output diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..c065388 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,3 @@ +# qqqq worked but not for test -> from utils.loaders import load_csv_table +from .loaders import load_csv_table +__all__ = ["load_csv_table"] \ No newline at end of file diff --git a/src/utils/loaders.py b/src/utils/loaders.py new file mode 100644 index 0000000..68a4f5b --- /dev/null +++ b/src/utils/loaders.py @@ -0,0 +1,19 @@ +"""Data loading utilities for Databricks pipelines""" +# i dont think we will want these as a package just as a module we wont be expoting and its just and extra steps for analyst which currently i do not think will provide value until they request it and will get in their way +def load_csv_table(spark, base_path, csv_filename): + """Load CSV from Azure storage with standard options + + Args: + base_path: Base path to the folder containing CSV files + csv_filename: Name of the CSV file to load + + Returns: + DataFrame: Spark DataFrame with CSV data + """ + + return ( + spark.read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(f"{base_path}{csv_filename}") + ) \ No newline at end of file diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..8b496d9 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,3 @@ +Tests live here +Run_Tests allows manual running for devs +Routes handled by conftest and toml diff --git a/tests/Run_Tests.ipynb b/tests/Run_Tests.ipynb new file mode 100644 index 0000000..b02ab2b --- /dev/null +++ b/tests/Run_Tests.ipynb @@ -0,0 +1,215 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "60b0c695-bb33-4088-9cb9-a9a451ebc89b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Test Runner Notebook\n", + "\n", + "Tests can be triggered from here.\n", + "GitActions in future would want to on deploying dabs trigger integration tests and within gitactions run unit tests as part of pull request process." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8e7bb6bc-d6d2-4466-8db1-970dad0f3d30", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Test Env Setup" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ff38cce8-1271-46d6-87f9-5c170b38fe45", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# MAGIC %pip install pytest\n", + "# because doesnt get pytest from toml???\n", + "import pytest\n", + "import sys\n", + "#import os\n", + "\n", + "# This looks 'up' one level from this notebook to find your project root\n", + "# using toml now\n", + "# repo_root = os.path.abspath('..') \n", + "# sys.path.append(f\"{repo_root}/src\")\n", + "\n", + "# Prevent __pycache__ creation\n", + "sys.dont_write_bytecode = True\n", + "\n", + "# print(f\"Project root set to: {repo_root}\")\n", + "print(\"Setup run\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c0b2b59b-7a20-43ac-becf-b1be7f487d68", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Unit Test Runner" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ae7b9eca-1b77-4b3f-bfb8-27abef298460", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "\n", + "# Run your test file\n", + "# This will run every file starting with 'test_' in that folder\n", + "pytest.main([\n", + " \"unit-tests\",\n", + " \"-v\", # verbose\n", + " \"-s\", # show print output\n", + " \"--tb=short\" # short tracebacks\n", + "])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f5a4ed24-9c54-47e0-96ca-d6b5ae80a1b1", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Integration Test Runner" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "818af375-e198-4799-b494-15d4f273f802", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Run your test file\n", + "# This will run every file starting with 'test_' in that folder\n", + "pytest.main([\n", + " \"integration-tests\",\n", + " \"-v\", # verbose\n", + " \"-s\", # show print output\n", + " \"--tb=short\" # short tracebacks\n", + "])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "eb174373-6843-4d70-bc34-d95d47db7c2d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "Run_Tests", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8da9d81 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,36 @@ +import pytest +from pyspark.sql import SparkSession + + +@pytest.fixture(scope="session") +def spark(): + # Databricks / Spark Connect + session = SparkSession.getActiveSession() + if session is not None: + yield session + return + + # Local / CI Spark + spark = ( + SparkSession.builder + .master("local[1]") + .appName("pytest-pyspark") + .config("spark.ui.enabled", "false") + .getOrCreate() + ) + + yield spark + spark.stop() + + +### Worked but want something that can work with cicd +# import pytest +# from pyspark.sql import SparkSession + +# # Use existing Spark session in Databricks (Spark Connect) +# @pytest.fixture(scope="session") +# def spark(): +# session = SparkSession.getActiveSession() +# if session is None: +# raise RuntimeError("No active Spark session found. Ensure you are running in Databricks.") +# return session \ No newline at end of file diff --git a/src/transformations/.gitinclude b/tests/data-quality-tests/.gitinclude similarity index 100% rename from src/transformations/.gitinclude rename to tests/data-quality-tests/.gitinclude diff --git a/tests/integration-tests/README.md b/tests/integration-tests/README.md new file mode 100644 index 0000000..27d48d6 --- /dev/null +++ b/tests/integration-tests/README.md @@ -0,0 +1,43 @@ +# To use this folder +- need proper pathing +- need toml and test config set up + +# Integration Tests + +These tests are not purely functional they interact with the databricks environment. +They will need dbutils spark and the file system. +We should write code to keep these concerns seperate where we can to enable clean testing + +Integration tests will be triggered by git action or can be triggered via a notebook. +They will run in databricks environment where as unit tests will be purely functional so can be run +via a notebook in databrick or as a gitaction within github. + +# Notes to help future implementation + +## A databricks job to be triggered on deploying a dab i expect +Just pseudo code +``` +bundle: + name: my-feature-tests + +resources: + jobs: + test_job: + name: "PyTest Runner" + tasks: + - task_key: run_pytest + new_cluster: + spark_version: "13.3.x-scala2.12" + node_type_id: "Standard_DS3_v2" + num_workers: 1 + notebook_task: + notebook_path: ./somepath/run_integration_tests.py + +``` +Something to add to the run tests py +``` +# Very important: Exit with the code from pytest +# so the GitHub Action knows if it failed or passed +if retcode != 0: + sys.exit(retcode) +``` \ No newline at end of file diff --git a/src/utils/.gitinclude b/tests/integration-tests/bronze/.gitkeep similarity index 100% rename from src/utils/.gitinclude rename to tests/integration-tests/bronze/.gitkeep diff --git a/tests/.gitinclude b/tests/integration-tests/gold/(Clone) .gitkeep similarity index 100% rename from tests/.gitinclude rename to tests/integration-tests/gold/(Clone) .gitkeep diff --git a/tests/integration-tests/ingestion/(Clone) .gitkeep b/tests/integration-tests/ingestion/(Clone) .gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration-tests/silver/(Clone) .gitkeep b/tests/integration-tests/silver/(Clone) .gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration-tests/transformations/(Clone) .gitkeep b/tests/integration-tests/transformations/(Clone) .gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration-tests/utils/test_loaders.py b/tests/integration-tests/utils/test_loaders.py new file mode 100644 index 0000000..a29277d --- /dev/null +++ b/tests/integration-tests/utils/test_loaders.py @@ -0,0 +1,148 @@ +import random +import pytest +from pathlib import Path +from pyspark.sql import SparkSession +import random +from pyspark.dbutils import DBUtils + + +############################### +## This is more integration now anyway i need a core function test for unit +## Spark and dbutils are not going to work in git +## Routing wont work in git actions +## Need to move things into testing config +## need to move thing into toml +## file needs to be in correct location +############################## + + +from utils import load_csv_table + + +###################################### +### PyTest doesnt have access to spark and dbutils in the same way so we are checking the test setup here ### +###################################### + +@pytest.fixture(scope="function") +def test_dbfs_setup(spark): + """ + Fixture to handle the setup and teardown of DBFS test data. + This keeps the actual test function clean. + """ + dbutils = DBUtils(spark) + test_id = random.randint(10000, 99999) + dbfs_folder = f"/tmp/test_csv_{test_id}" + filename = "Contact_Details.csv" + + # --- SETUP: Create a real CSV file --- + test_data = [ + ("ORG001", "Test Hospital", "Active"), + ("ORG002", "Test Clinic", "Inactive"), + ("ORG003", "Test Surgery", "Active") + ] + columns = ["OrganisationID", "Name", "Status"] + test_df = spark.createDataFrame(test_data, columns) + + # Write to temp then move to make it a 'proper' single CSV file + temp_path = f"{dbfs_folder}/temp" + test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_path) + + part_file = [f.path for f in dbutils.fs.ls(temp_path) if f.path.endswith('.csv')][0] + final_path = f"{dbfs_folder}/{filename}" + dbutils.fs.cp(part_file, final_path) + + # Provide the path to the test + yield f"dbfs:{dbfs_folder}/", filename + + # --- TEARDOWN: Cleanup after test --- + dbutils.fs.rm(dbfs_folder, recurse=True) + +def test_load_csv_table_using_dbfs(spark, test_dbfs_setup): + """ + Test loading CSV from DBFS using the logic from src/utils. + """ + # Unpack the fixture values + base_path, filename = test_dbfs_setup + + # ACT: Run the actual function from your src/utils + df = load_csv_table(spark, base_path, filename) + + # ASSERT + assert df.count() == 3 + assert "OrganisationID" in df.columns + + first_row = df.collect()[0] + assert first_row["OrganisationID"] == "ORG001" + print(f"โœ… Successfully verified {filename} at {base_path}") + +################### DEL below just in because poc ########################## + +# def test_load_csv_table_using_dbfs(spark): +# """ +# Test loading CSV from DBFS - THIS SHOULD WORK +# """ +# dbutils = DBUtils(spark) + +# # Create test data +# test_data = [ +# ("ORG001", "Test Hospital", "Active"), +# ("ORG002", "Test Clinic", "Inactive"), +# ("ORG003", "Test Surgery", "Active") +# ] +# columns = ["OrganisationID", "Name", "Status"] +# test_df = spark.createDataFrame(test_data, columns) + +# # Write to DBFS +# test_id = random.randint(10000, 99999) +# dbfs_path = f"/tmp/test_dbfs_{test_id}" +# filename = "Contact_Details.csv" + +# # Write using Spark (creates directory with part files) +# test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(dbfs_path) + +# print(f"โœ“ Wrote to DBFS: {dbfs_path}") + +# # Setup paths +# test_id = random.randint(10000, 99999) +# dbfs_folder = f"/tmp/test_dbfs_{test_id}" +# filename = "Contact_Details.csv" + +# # STEP 1: Write to temporary location (Spark creates part files) +# temp_write_path = f"{dbfs_folder}/temp_write" +# test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_write_path) + +# print(f"โœ“ Wrote to temp location: {temp_write_path}") + +# # STEP 2: Find the actual CSV part file that was created +# files = dbutils.fs.ls(temp_write_path) +# csv_part_file = [f.path for f in files if f.path.endswith('.csv')][0] + +# print(f"โœ“ Found part file: {csv_part_file}") + +# # STEP 3: Copy it to a proper filename +# final_csv_path = f"{dbfs_folder}/{filename}" +# dbutils.fs.cp(csv_part_file, final_csv_path) + +# print(f"โœ“ Copied to proper filename: {final_csv_path}") + +# # Test our function with DBFS path +# base_path = f"dbfs:{dbfs_folder}/" +# df = load_csv_table(spark, base_path, filename) +# # worked but its using partials files in a folder directory - i want to test with an actual full file +# # df = load_csv_table_copy(spark, base_path, "") # Empty filename since Spark reads the whole directory + +# print(f"โœ“ Loaded using: load_csv_table(spark, '{base_path}', '{filename}')") +# df.show() + +# # Assertions +# assert df.count() == 3, f"Expected 3 rows, got {df.count()}" +# assert "OrganisationID" in df.columns +# assert "Name" in df.columns +# assert "Status" in df.columns + +# first_row = df.collect()[0] +# assert first_row["OrganisationID"] == "ORG001" + +# # Cleanup +# dbutils.fs.rm(dbfs_path, recurse=True) +# print("โœ… DBFS test PASSED!") \ No newline at end of file diff --git a/tests/unit-tests/ReadMe.md b/tests/unit-tests/ReadMe.md new file mode 100644 index 0000000..bd9bb5c --- /dev/null +++ b/tests/unit-tests/ReadMe.md @@ -0,0 +1,19 @@ +# Unit Tests + +The unit tests will allow us to reuse code confidently. We can change how a utility function does what it does and know it has not broken other code dependent on it for example. + +# Running unit tests + +## Running in Databricks + +## How github runs Unit Test + +## How to add to Unit Tests + +## When to add Unit Tests + +## Cursory Notes on Peer Reviewing Unit Tests or recommend their addition + +# Folder Structure + +The structure is mirroring the src/layer structure \ No newline at end of file diff --git a/tests/unit-tests/bronze/(Clone) (Clone) .gitinclude b/tests/unit-tests/bronze/(Clone) (Clone) .gitinclude new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/gold/(Clone) (Clone) .gitinclude b/tests/unit-tests/gold/(Clone) (Clone) .gitinclude new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/ingestion/(Clone) .gitinclude b/tests/unit-tests/ingestion/(Clone) .gitinclude new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/silver/(Clone) (Clone) (Clone) .gitinclude b/tests/unit-tests/silver/(Clone) (Clone) (Clone) .gitinclude new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/transformations/(Clone) (Clone) .gitinclude b/tests/unit-tests/transformations/(Clone) (Clone) .gitinclude new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/transformations/test_date_transformations.py b/tests/unit-tests/transformations/test_date_transformations.py new file mode 100644 index 0000000..9888c50 --- /dev/null +++ b/tests/unit-tests/transformations/test_date_transformations.py @@ -0,0 +1,229 @@ +# test_working_days_monthly.py +import sys +import pytest +from pyspark.sql import SparkSession +from pyspark.sql import functions as fn + +# Project imports +# should be provided by toml +# PROJECT_ROOT = "/Workspace/Users/philip.tate@nhs.net/PT Separate Feature Branch/src" +# sys.path.insert(0, PROJECT_ROOT) + +from transformations import working_days_monthly + + +# ============================================================================ +# FIXTURES +# ============================================================================ +# should be automatic +# @pytest.fixture(scope="session") +# def spark(): +# """Provide Spark session for all tests""" +# session = SparkSession.getActiveSession() +# if session is None: +# raise RuntimeError("No active Spark session found. Running in Databricks?") +# return session + + +@pytest.fixture(scope="session") +def spark(): + """Provide Spark session for all tests""" + session = SparkSession.getActiveSession() + if session is None: + raise RuntimeError("No active Spark session found. Running in Databricks?") + return session + + +@pytest.fixture(scope="function") +def sample_dataframe_january_2023(spark): + """ + Fixture providing a simple test dataframe for January 2023 + """ + data = [("2023-01-01",)] + columns = ["start_date"] + return spark.createDataFrame(data, columns) + + +@pytest.fixture(scope="function") +def sample_dataframe_multiple_months(spark): + """ + Fixture providing a dataframe with multiple months + """ + data = [ + ("2023-01-01",), + ("2023-02-01",), + ("2023-03-01",), + ] + columns = ["start_date"] + return spark.createDataFrame(data, columns) + + +@pytest.fixture(scope="function") +def sample_dataframe_with_extra_columns(spark): + """ + Fixture providing a dataframe with additional columns beyond start_date + """ + data = [ + ("2023-01-01", "ORG001", 100), + ("2023-02-01", "ORG002", 200), + ] + columns = ["start_date", "org_id", "value"] + return spark.createDataFrame(data, columns) + + +# ============================================================================ +# TESTS +# ============================================================================ + +def test_working_days_adds_column(spark, sample_dataframe_january_2023): + """ + Test that working_days_monthly adds the working_days column + """ + # ACT + result_df = working_days_monthly(spark, sample_dataframe_january_2023, "start_date") + + # ASSERT + assert "working_days" in result_df.columns, "working_days column should be added" + assert result_df.count() == 1, f"Expected 1 row, got {result_df.count()}" + + print("โœ… Column added successfully") + + +def test_working_days_january_2023_count(spark, sample_dataframe_january_2023): + """ + Test that January 2023 returns 22 working days + January 2023: 31 days - 4 Saturdays - 5 Sundays = 22 working days + """ + # ACT + result_df = working_days_monthly(spark, sample_dataframe_january_2023, "start_date") + + # ASSERT + working_days = result_df.collect()[0]["working_days"] + assert working_days == 22, f"January 2023 should have 22 working days, got {working_days}" + + print(f"โœ… January 2023 verified: {working_days} working days") + +# Just trying out pytest.mark so can exclude by run: pytest -m "not databricks" but the intention would be unit tests via github action and integration by github action triggering test in databricks environment +@pytest.mark.databricks +def test_working_days_february_2023_count(spark): + """ + Test that February 2023 returns 20 working days + February 2023: 28 days - 4 Saturdays - 4 Sundays = 20 working days + """ + # ARRANGE + data = [("2023-02-01",)] + columns = ["start_date"] + input_df = spark.createDataFrame(data, columns) + + # ACT + result_df = working_days_monthly(spark, input_df, "start_date") + + # ASSERT + working_days = result_df.collect()[0]["working_days"] + assert working_days == 20, f"February 2023 should have 20 working days, got {working_days}" + + print(f"โœ… February 2023 verified: {working_days} working days") + + +def test_working_days_multiple_months(spark, sample_dataframe_multiple_months): + """ + Test that function handles multiple months correctly + """ + # ACT + result_df = working_days_monthly(spark, sample_dataframe_multiple_months, "start_date") + + # ASSERT + assert result_df.count() == 3, "Should have 3 rows" + + # All rows should have positive working days + results = result_df.collect() + for row in results: + assert row["working_days"] > 0, f"All rows should have positive working days" + assert row["working_days"] <= 23, f"Working days should be reasonable" + + print("โœ… Multiple months handled correctly") + + +def test_working_days_preserves_columns(spark, sample_dataframe_with_extra_columns): + """ + Test that function preserves existing columns + """ + # ACT + result_df = working_days_monthly(spark, sample_dataframe_with_extra_columns, "start_date") + + # ASSERT + assert "start_date" in result_df.columns, "Original start_date column should be preserved" + assert "org_id" in result_df.columns, "org_id column should be preserved" + assert "value" in result_df.columns, "value column should be preserved" + assert "working_days" in result_df.columns, "working_days column should be added" + + # Check data integrity + first_row = result_df.filter(fn.col("org_id") == "ORG001").collect()[0] + assert first_row["value"] == 100, "Original data should be preserved" + assert first_row["working_days"] > 0, "Working days should be calculated" + + print("โœ… All columns preserved correctly") + + +def test_working_days_with_null_dates(spark): + """ + Test handling of null dates (edge case) + """ + # ARRANGE + data = [ + ("2023-01-01",), + (None,), + ("2023-02-01",), + ] + columns = ["start_date"] + input_df = spark.createDataFrame(data, columns) + + # ACT + result_df = working_days_monthly(spark, input_df, "start_date") + + # ASSERT + assert result_df.count() == 3, "Should handle null dates without dropping rows" + + # Null date should have 0 working days (due to coalesce in function) + null_row = result_df.filter(fn.col("start_date").isNull()).collect() + if null_row: + assert null_row[0]["working_days"] == 0, "Null dates should have 0 working days" + + print("โœ… Null dates handled correctly") + + +def test_working_days_different_column_name(spark): + """ + Test with a different column name for start date + """ + # ARRANGE + data = [("2023-01-01",), ("2023-02-01",)] + columns = ["month_beginning"] # Different column name + input_df = spark.createDataFrame(data, columns) + + # ACT + result_df = working_days_monthly(spark, input_df, "month_beginning") + + # ASSERT + assert "working_days" in result_df.columns + assert result_df.count() == 2 + + print("โœ… Different column name handled correctly") + + +def test_working_days_values_are_reasonable(spark, sample_dataframe_multiple_months): + """ + Test that all working days values are within reasonable bounds + No month should have more than 23 working days or less than 19 + """ + # ACT + result_df = working_days_monthly(spark, sample_dataframe_multiple_months, "start_date") + + # ASSERT + results = result_df.collect() + for row in results: + working_days = row["working_days"] + assert 19 <= working_days <= 23, \ + f"Working days should be between 19-23, got {working_days} for {row['start_date']}" + + print("โœ… All working days values are reasonable") \ No newline at end of file diff --git a/tests/unit-tests/utils/fixtures/.gitkeep b/tests/unit-tests/utils/fixtures/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit-tests/utils/fixtures/org_data.csv b/tests/unit-tests/utils/fixtures/org_data.csv new file mode 100644 index 0000000..02795cd --- /dev/null +++ b/tests/unit-tests/utils/fixtures/org_data.csv @@ -0,0 +1,4 @@ +OrganisationID,Name,Status +ORG001,Test Hospital,Active +ORG002,Test Clinic,Inactive +ORG003,Test Surgery,Active \ No newline at end of file