Skip to main content

Session 7 data flow part 2

 






Data Flow Name: df_transform_hospital_admissions

Pipeline Steps:

  1. Source (HospitalAdmissionSource):

    • Pulls data from ds_raw_hospital_admission.

  2. SelectReqdFields:

    • Renames or selects specific fields: country, indicator, etc.

  3. LookupCountry:

    • Performs a lookup using CountrySource (likely from ds_country_lookup) to enrich the data.

  4. SelectReqdFields2:

    • Refines the result further with a new set of selected or renamed fields.

  5. Split into Weekly and Daily:

    • A Conditional Split divides the data into two branches:

      • Weekly (9 columns total)

      • Daily (filtering on indicator column, likely conditional logic)

Right Panel:

  • Shows general properties.

  • Name: df_transform_hospital_admissions.

  • Description: Empty.

Bottom Panel (Data preview):

  • Currently loading: “Fetching data…”.

  • Status: Data flow debug is enabled (green).

  • Operation counts like INSERT, UPDATE, DELETE, etc., are N/A, meaning this is likely a preview run or the data hasn’t loaded yet.


🔁 Complete Transformation Breakdown


🟦 1. Source (ds_raw_hospital_admission)

  • What it does:

    • Reads raw hospital admission data from a source dataset (e.g., CSV, database).

    • Fields: country, reported_date, hospital_occupancy_count, icu_occupancy_count, etc.


🟨 2. fields2 (Conditional Split)

  • What it does:

    • Splits incoming data into two branches: Weekly and Daily.

    • Based on a condition, likely using a flag or pattern in the data like:

      sql
      reported_granularity == 'weekly' => Weekly branch reported_granularity == 'daily' => Daily branch
  • Why:

    • Enables separate transformation logic for weekly and daily reporting formats.


🟩 3. Weekly Branch

🔷 a. JoinWithDate (Join)

  • What it does:

    • Joins raw data with a Date Dimension (likely AggDimDate).

    • Join keys: reported_date from source and date from the dimension.

  • Why:

    • Enriches records with derived values like year_week, week_start_date, etc.


🔷 b. PivotWeekly (Pivot)

  • What it does:

    • Pivots indicators (like hospital and ICU occupancy counts) into separate columns.

  • Group by:

    • Likely year_week, country

  • Values:

    • Transforms rows into a wider format with columns like:

      • hospital_occupancy_count

      • icu_occupancy_count

  • Why:

    • Aggregates and reshapes data for weekly reporting.


🔷 c. SortWeekly (Sort)

  • What it does:

    • Sorts the data by reported_year_week and country

  • Why:

    • Ensures data is consistently ordered before writing to sink.


🔷 d. SelectWeekly (Select)

  • What it does:

    • Keeps only required columns and renames as needed.

    • Final schema might include:

      • country, reported_year_week, hospital_occupancy_count, icu_occupancy_count

  • Why:

    • Cleans and prepares data for export.


🔷 e. WeeklySink (Sink)

  • What it does:

    • Writes the transformed weekly data to a target dataset.

    • Sink: ds_processed_hospital_admission_weekly

  • Why:

    • Makes weekly data available for reporting/analytics.


🟩 4. Daily Branch

🔷 a. PivotDaily (Pivot)

  • What it does:

    • Similar to PivotWeekly, but operates on daily granularity.

  • Group by:

    • reported_date, country

  • Why:

    • Converts long-format daily data into a wide format for daily analysis.


🔷 b. SortDaily (Sort)

  • What it does:

    • Sorts by reported_date and country

  • Why:

    • Ensures orderliness and data consistency in final output.


🔷 c. SelectDaily (Select)

  • What it does:

    • Selects relevant fields like:

      • country, reported_date, hospital_occupancy_count, icu_occupancy_count, population, source

  • Why:

    • Aligns with target schema and ensures only meaningful data is exported.


🔷 d. DailySink (Sink)

  • What it does:

    • Writes the final daily data to ds_processed_hospital_admission_daily

  • Why:

    • Makes daily data available for downstream use (dashboards, exports).

Transformation Type Description
ds_raw_hospital_admission Source Loads raw hospital admission data
fields2 Conditional Split Splits data into Daily and Weekly pipelines
JoinWithDate Join Adds weekly context by joining with date dimension
PivotWeekly Pivot Converts indicator rows into columns (weekly)
SortWeekly Sort Sorts by week and country
SelectWeekly Select Keeps/renames columns for export
WeeklySink Sink Outputs to weekly processed dataset
PivotDaily Pivot Converts indicator rows into columns (daily)
SortDaily Sort Sorts by date and country
SelectDaily Select Keeps/renames columns for export
DailySink Sink Outputs to daily processed dataset

Comments

Popular posts from this blog

session 19 Git Repository

  🔁 Steps to Create a Branch in Databricks, Pull from Git, and Merge into a Collaborative Branch Create a New Branch in Databricks: Go to the Repos tab in your workspace. Navigate to the Git-linked repo. Click the Git icon (or three dots ⋮) and choose "Create Branch." Give your branch a name (e.g., feature-xyz ) and confirm. Pull the Latest Changes from Git: With your new branch selected, click the Git icon again. Select “Pull” to bring the latest updates from the remote repository into your local Databricks environment. Make Changes & Commit: Edit notebooks or files as needed in your branch. Use the "Commit & Push" option to push changes to the remote repo. Merge into the Collaborative Branch: Switch to the collaborative branch (e.g., dev or main ) in Git or from the Databricks UI. Click "Pull & Merge" . Choose the branch you want to merge into the collaborative branch. Review the c...

Session 18 monitering and logging - Azure Monitor , Log analytics , and job notification

 After developing the code, we deploy it into the production environment. To monitor and logging the jobs run in the real time systems in azure  we have scheduled the jobs under the workflow , we haven't created any monitoring or any matrics . After a few times, the job failed, but we don't know because we haven't set up any monitoring, and every time we can't navigate to workspace-> workflows, under runs to see to check whether the job has been successfully running or not and in real time there will be nearly 100 jobs or more jobs to run  In real time, the production support team will monitor the process. Under the workflow, there is an option called Job notification. After setting the job notification, we can set a notification to email . if we click the date and time its takes us to the notebook which is scheduled there we can able to see the error where it happens . order to see more details, we need to under Spark tab, where we have the option to view logs ( tha...

ingestion of data from the database

There are two ways to read the data from the database , which we decides depending upon the data sources. 1. ODBC You're using ODBC to ingest data into a pipeline, you generally need the following information 1. ODBC Connection String    connection_string = ( "Driver={ODBC Driver 17 for SQL Server};" "Server=my_server.database.windows.net;" "Database=my_database;" "UID=my_user;" "PWD=my_password;" ) Key components of a connection string often include: Driver : The name of the specific ODBC driver installed on the system (e.g., {ODBC Driver 17 for SQL Server} , {PostgreSQL Unicode} , {MySQL ODBC 8.0 Unicode Driver} ). Server (or Host ): The hostname or IP address of the database server. Port : The port number on which the database server is listening (if not default). Database (or Initial Catalog ): The name of the specific database you want to connect to within the server. UID (User ID): The usernam...