diff --git a/src/main.py b/src/main.py index 50c21af..0dd3060 100644 --- a/src/main.py +++ b/src/main.py @@ -14,6 +14,7 @@ origins = [ "http://localhost", "http://localhost:3000", + "http://localhost:3001", ] app.add_middleware( @@ -49,7 +50,7 @@ async def create_upload_file(file: UploadFile, token: str): @app.post("/api/generate-ingest-files/") async def generate_ingest_files(token: str, data: RequestData): return csv_parser_utils.generate_ingest_files( - token, data.column_metadata.model_dump(), data.program_name, data.program_desc + token, data.column_metadata.model_dump(), data.program_name, data.program_desc, data.dimensions ) diff --git a/src/models.py b/src/models.py index 4039798..9e982ac 100644 --- a/src/models.py +++ b/src/models.py @@ -1,9 +1,10 @@ from typing import Dict from pydantic import BaseModel, RootModel, field_validator, constr - +from typing import List class ColumnMetadataItem(BaseModel): updated_col_name: str + type: str metric: bool dimension: bool @@ -11,6 +12,24 @@ class ColumnMetadataItem(BaseModel): class ColumnMetadata(RootModel): root: Dict[str, ColumnMetadataItem] +class DimensionFieldDataValueModel(BaseModel): + updated_col_name: str + type: str + metric: bool + +class DimensionFieldDataModel(BaseModel): + key: str + values: DimensionFieldDataValueModel + +class DimensionFieldModel(BaseModel): + value: str + isPrimary: bool + isIndex: bool + data: DimensionFieldDataModel + +class DimensionModel(BaseModel): + name: str + fields: List[DimensionFieldModel] class RequestData(BaseModel): program_name: constr( @@ -21,6 +40,7 @@ class RequestData(BaseModel): ) program_desc: str column_metadata: ColumnMetadata + dimensions: List[DimensionModel] @field_validator("program_name") @classmethod diff --git a/src/utils/csv_parser_utils.py b/src/utils/csv_parser_utils.py index 8dd5e9e..c1301d6 100644 --- a/src/utils/csv_parser_utils.py +++ b/src/utils/csv_parser_utils.py @@ -40,23 +40,18 @@ def guess_metrics_and_columns(token: str, filename: str): column_type_dict = dict() for column in df.columns: - if df[column].dtype == "int": - column_type_dict[df[column].name] = { - "updated_col_name": df[column].name, - "metric": True, - "dimension": False, - } - else: - column_type_dict[df[column].name] = { - "updated_col_name": df[column].name, - "metric": False, - "dimension": True, - } + column_type_dict[df[column].name] = { + "updated_col_name": df[column].name, + "type": str(df[column].dtype), + "metric": False, + "timeDimension": False, + "dimension": False + } return column_type_dict def generate_ingest_files( - token: str, column_metadata: typing.Dict, program_name: str, program_desc: str + token: str, column_metadata: typing.Dict, program_name: str, program_desc: str, dimensions: typing.Iterable ): folder_path = os.path.join(TMP_BASE_PATH, token) file_path = os.path.join( @@ -66,77 +61,152 @@ def generate_ingest_files( df = pd.read_csv(file_path) df, column_mapping = format_df_columns(df, column_metadata) - - metrics, dimensions = [], [] + metrics, otherCols, dimensionFkCols = [], [], [] for cols in column_metadata: updated_col_name = column_metadata[cols]["updated_col_name"] if column_metadata[cols]["metric"]: metrics.append(column_mapping[updated_col_name]) + elif column_metadata[cols]["dimension"]: + print('Do Nothing') else: - dimensions.append(column_mapping[updated_col_name]) + otherCols.append(cols) + + for dimension in dimensions: + if (len(dimension.fields) > 1): + for field in dimension.fields: + if (field.isIndex): + data = { "key": field.data.key, "name": dimension.name } + dimensionFkCols.append(data) + break + else: + data = { "key": f"{dimension.name}_id", "name": dimension.name } + dimensionFkCols.append(data) ingest_folder_path = os.path.join(folder_path, "ingest") - write_dimensions_to_ingest_folder(df, dimensions, ingest_folder_path) - write_events_to_ingest_folder( - df, dimensions, metrics, program_name, ingest_folder_path - ) + write_dimensions_to_ingest_folder(df, dimensions, column_metadata, ingest_folder_path) write_config_to_ingest_folder(program_name, program_desc, ingest_folder_path) + write_events_to_ingest_folder(program_name, df, metrics, dimensions, ingest_folder_path, dimensionFkCols, otherCols, column_metadata) return {"dimension": dimensions, "metrics": metrics} +def getType(dataType: str): + if (dataType == 'int64' or dataType == 'float64'): + return 'number' + elif (dataType == 'datetime64'): + return 'date' + elif (dataType == 'object'): + return 'string' + else: + return 'string' def write_dimensions_to_ingest_folder( - df: pd.DataFrame, dimensions: typing.Iterable, ingest_folder_path: str + df: pd.DataFrame, dimensions: typing.Iterable, column_metadata: typing.Dict, ingest_folder_path: str ): dimensions_base_path = os.path.join(ingest_folder_path, "dimensions") create_folder_if_not(dimensions_base_path) for dimension in dimensions: - dimension_grammar_data = f"""PK,Index -string,string -{dimension}_id,{dimension}""" + index_row, type_row, column_row, indexed_col, req_cols = [], [], [], [], [] + if (len(dimension.fields) > 1): + for field in dimension.fields: + if (field.isPrimary): + index_row.append('PK') + indexed_col.append(column_metadata[field.value]['updated_col_name']) + elif (field.isIndex): + index_row.append('Index') + indexed_col.append(column_metadata[field.value]['updated_col_name']) + else: + index_row.append('') + + type_row.append(getType(column_metadata[field.value]['type'])) + column_row.append(column_metadata[field.value]['updated_col_name']) + req_cols.append(column_metadata[field.value]['updated_col_name']) + else: + index_row.append('PK') + type_row.append('number') + column_row.append(f"{dimension.name}_id") + + index_row.append('Index') + type_row.append(getType(column_metadata[dimension.fields[0].value]['type'])) + column_row.append(column_metadata[dimension.fields[0].value]['updated_col_name']) + + indexed_col.append(column_metadata[dimension.fields[0].value]['updated_col_name']) + req_cols.append(column_metadata[dimension.fields[0].value]['updated_col_name']) + dimension_grammar_data = f"""{','.join(index_row)} +{','.join(type_row)} +{','.join(column_row)} +""" + with open( - os.path.join(dimensions_base_path, f"{dimension}-dimension.grammar.csv"), + os.path.join(dimensions_base_path, f"{dimension.name}-dimension.grammar.csv"), "w", ) as f: f.write(dimension_grammar_data) - column_df = pd.DataFrame(df[dimension].drop_duplicates(keep="first")) - column_df.insert( - loc=0, column=f"{dimension}_id", value=range(1, len(column_df) + 1) - ) - column_df.to_csv( - os.path.join(dimensions_base_path, f"{dimension}-dimension.data.csv"), + + df2 = df[req_cols] + df2 = df2.drop_duplicates(subset=indexed_col) + if (len(dimension.fields) == 1): + df2.insert( + loc=0, column=f"{dimension.name}_id", value=range(1, len(df2) + 1) + ) + + df2.to_csv( + os.path.join(dimensions_base_path, f"{dimension.name}-dimension.data.csv"), index=False, ) def write_events_to_ingest_folder( - df: pd.DataFrame, dimensions, metrics, program_name, ingest_folder_path + program_name: str, df: pd.DataFrame, metrics: typing.Iterable, dimensions: typing.Iterable, ingest_folder_path: str, dimensionFkCols: typing.Iterable, otherCols: typing.Iterable, column_metadata: typing.Dict ): events_base_path = os.path.join(ingest_folder_path, "programs", program_name) create_folder_if_not(events_base_path) for metric in metrics: + dimension_name_row, dimension_col_row, type_row, column_row, last_row = [], [], [], [], [] + for dimensionFkCol in dimensionFkCols: + if column_metadata.get(dimensionFkCol['key']) is not None: + columnName = column_metadata[dimensionFkCol['key']]['updated_col_name'] + typeOfDimension = getType(column_metadata[dimensionFkCol['key']]['type']) + else: + columnName = dimensionFkCol['key'] + typeOfDimension = 'number' + + dimension_name_row.append(dimensionFkCol['name']) + dimension_col_row.append(columnName) + type_row.append(typeOfDimension) + column_row.append(columnName) + last_row.append("dimension") + + for otherCol in otherCols: + dimension_name_row.append("") + dimension_col_row.append("") + type_row.append(getType(column_metadata[otherCol]['type'])) + column_row.append(column_metadata[otherCol]['updated_col_name']) + if (column_metadata[otherCol]['type'] == "datetime64"): + last_row.append("timeDimension") + else: + last_row.append("") + + dimension_name_row.append("") + dimension_col_row.append("") + type_row.append(getType(column_metadata[metric]['type'])) + column_row.append(column_metadata[metric]['updated_col_name']) + last_row.append("metric") + + event_grammar = f"""{','.join(dimension_name_row)} +{','.join(dimension_col_row)} +{','.join(type_row)} +{','.join(column_row)} +{','.join(last_row)} +""" + with open( os.path.join(events_base_path, f"{metric}-event.grammar.csv"), "w" ) as f: - f.write("," + ",".join(dimensions) + "," + "\n") - f.write("," + ",".join(dimensions) + "," + "\n") - f.write("date," + "string," * (len(dimensions)) + "integer" "\n") - f.write("date," + ",".join(dimensions) + f",{metric}" + "\n") - f.write( - "timeDimension," + "dimension," * (len(dimensions)) + "metric" + "\n" - ) - headers = dimensions + [metric] - event_df = pd.DataFrame(df[headers]) - event_df.insert( - loc=0, column=f"date", value=datetime.today().strftime("%d/%m/%y") - ) - event_df.to_csv( - os.path.join(events_base_path, f"{metric}-event.data.csv"), index=False - ) + f.write(event_grammar) def write_config_to_ingest_folder( @@ -204,5 +274,5 @@ def fetch_file_content(token: str, filename: str): if not os.path.exists(file_path): return None df = pd.read_csv(file_path) - json_response = df.head().to_json(orient="records") + json_response = df.to_json(orient="records") return json_response