|
| 1 | +from pyspark.sql.functions import ( |
| 2 | + col, |
| 3 | + concat_ws, |
| 4 | + lit, |
| 5 | +) |
| 6 | + |
| 7 | +from datacustomcode.client import Client |
| 8 | +from datacustomcode.io.writer.base import WriteMode |
| 9 | + |
| 10 | + |
| 11 | +def main(): |
| 12 | + client = Client() |
| 13 | + |
| 14 | + employees = client.read_dlo("Employee__dll").persist() |
| 15 | + employees = employees.select("id__c", "manager_id__c", "name__c", "position__c") |
| 16 | + employees.show() |
| 17 | + employees_with_manager = ( |
| 18 | + employees.alias("e") |
| 19 | + .join( |
| 20 | + employees.alias("m"), |
| 21 | + col("e.manager_id__c").cast("string") == col("m.id__c").cast("string"), |
| 22 | + "left", |
| 23 | + ) |
| 24 | + .select( |
| 25 | + col("e.id__c"), |
| 26 | + col("e.name__c"), |
| 27 | + col("e.position__c"), |
| 28 | + col("e.manager_id__c"), |
| 29 | + col("m.name__c").alias("manager_name__c"), |
| 30 | + ) |
| 31 | + .persist() |
| 32 | + ) |
| 33 | + |
| 34 | + hierarchy_df = ( |
| 35 | + employees_with_manager.filter(col("manager_id__c").isNull()) |
| 36 | + .withColumn("hierarchy_level__c", lit(1)) |
| 37 | + .withColumn("management_chain__c", col("name__c")) |
| 38 | + .persist() |
| 39 | + ) |
| 40 | + |
| 41 | + current_level = 1 |
| 42 | + |
| 43 | + while True: |
| 44 | + ewm = employees_with_manager.alias("ewm") |
| 45 | + hdf = hierarchy_df.filter(col("hierarchy_level__c") == current_level).alias( |
| 46 | + "hdf" |
| 47 | + ) |
| 48 | + |
| 49 | + next_level_df = ewm.join( |
| 50 | + hdf, |
| 51 | + col("ewm.manager_id__c").cast("string") == col("hdf.id__c").cast("string"), |
| 52 | + "inner", |
| 53 | + ).select( |
| 54 | + col("ewm.id__c"), |
| 55 | + col("ewm.name__c"), |
| 56 | + col("ewm.position__c"), |
| 57 | + col("ewm.manager_id__c"), |
| 58 | + col("ewm.manager_name__c"), |
| 59 | + (col("hdf.hierarchy_level__c") + 1).alias("hierarchy_level__c"), |
| 60 | + concat_ws(" | ", col("hdf.management_chain__c"), col("ewm.name__c")).alias( |
| 61 | + "management_chain__c" |
| 62 | + ), |
| 63 | + ) |
| 64 | + |
| 65 | + if next_level_df.isEmpty(): |
| 66 | + break |
| 67 | + |
| 68 | + hierarchy_df = hierarchy_df.union(next_level_df).persist() |
| 69 | + current_level += 1 |
| 70 | + |
| 71 | + hierarchy_df = hierarchy_df.orderBy("hierarchy_level__c", "manager_id__c", "id__c") |
| 72 | + |
| 73 | + dlo_name = "Employee_Hierarchy__dll" |
| 74 | + client.write_to_dlo(dlo_name, hierarchy_df, WriteMode.APPEND) |
| 75 | + |
| 76 | + |
| 77 | +if __name__ == "__main__": |
| 78 | + main() |
0 commit comments