Skip to content

Commit

Permalink
fix: bump pandas version limit (#2254)
Browse files Browse the repository at this point in the history
* bump pandas version limit

Signed-off-by: cmuhao <[email protected]>

* update feast

Signed-off-by: cmuhao <[email protected]>

* update feast to 0.26.0 to include python 3.8

Signed-off-by: cmuhao <[email protected]>

* revert feast version change

Signed-off-by: cmuhao <[email protected]>

* update feast version

Signed-off-by: cmuhao <[email protected]>

* update feast test

Signed-off-by: cmuhao <[email protected]>

* update feast test

Signed-off-by: cmuhao <[email protected]>

* update feast test

Signed-off-by: cmuhao <[email protected]>

* update numpy

Signed-off-by: cmuhao <[email protected]>

* fix time difference

Signed-off-by: cmuhao <[email protected]>

* remove blank line EOF

Signed-off-by: cmuhao <[email protected]>

* remove blank line EOF

Signed-off-by: cmuhao <[email protected]>

* inogre type change as mypy bumped

Signed-off-by: cmuhao <[email protected]>

* fix isort lint

Signed-off-by: cmuhao <[email protected]>

---------

Signed-off-by: cmuhao <[email protected]>
  • Loading branch information
HaoXuAI authored Jul 18, 2024
1 parent 0bef179 commit 74212e2
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 65 deletions.
2 changes: 1 addition & 1 deletion databuilder/databuilder/extractor/feast_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _extract_feature_view(
ColumnMetadata(
feature.name,
None,
feature.dtype.name,
feature.dtype.name, # type: ignore
len(feature_view.entities) + index,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def extract(self) -> Union[TableMetadata, None]:
def get_scope(self) -> str:
return 'extractor.kafka_schema_registry'

def _get_extract_iter(self) -> Optional[Iterator[TableMetadata]]:
def _get_extract_iter(self) -> Optional[Iterator[TableMetadata]]: # type: ignore
"""
Return an iterator generating TableMetadata for all of the schemas.
"""
Expand Down
4 changes: 2 additions & 2 deletions databuilder/databuilder/extractor/salesforce_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:
# Filter the sobjects if `OBJECT_NAMES_KEY` is set otherwise return all
sobjects = [
sobject
for sobject in self._client.describe()["sobjects"]
for sobject in self._client.describe()["sobjects"] # type: ignore
if (len(self._object_names) == 0 or sobject["name"] in self._object_names)
]

Expand All @@ -71,7 +71,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:
f"({i+1}/{len(sobjects)}) Extracting SalesForce object ({object_name})"
)
data = self._client.restful(path=f"sobjects/{object_name}/describe")
yield self._extract_table_metadata(object_name=object_name, data=data)
yield self._extract_table_metadata(object_name=object_name, data=data) # type: ignore

def _extract_table_metadata(
self, object_name: str, data: Dict[str, Any]
Expand Down
6 changes: 3 additions & 3 deletions databuilder/databuilder/models/dashboard/dashboard_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ def _create_record_iterator(self) -> Iterator[RDSModel]:
)
)
if self._chart_name:
record.name = self._chart_name
record.name = self._chart_name # type: ignore
if self._chart_type:
record.type = self._chart_type
record.type = self._chart_type # type: ignore
if self._chart_url:
record.url = self._chart_url
record.url = self._chart_url # type: ignore

yield record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def _create_record_iterator(self) -> Iterator[RDSModel]:
cluster_rk=self._get_cluster_key()
)
if self.dashboard_group_url:
dashboard_group_record.dashboard_group_url = self.dashboard_group_url
dashboard_group_record.dashboard_group_url = self.dashboard_group_url # type: ignore

yield dashboard_group_record

Expand All @@ -406,10 +406,10 @@ def _create_record_iterator(self) -> Iterator[RDSModel]:
dashboard_group_rk=self._get_dashboard_group_key()
)
if self.created_timestamp:
dashboard_record.created_timestamp = self.created_timestamp
dashboard_record.created_timestamp = self.created_timestamp # type: ignore

if self.dashboard_url:
dashboard_record.dashboard_url = self.dashboard_url
dashboard_record.dashboard_url = self.dashboard_url # type: ignore

yield dashboard_record

Expand Down
4 changes: 2 additions & 2 deletions databuilder/databuilder/models/dashboard/dashboard_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def _create_record_iterator(self) -> Iterator[RDSModel]:
)
)
if self._url:
record.url = self._url
record.url = self._url # type: ignore
if self._query_text:
record.query_text = self._query_text
record.query_text = self._query_text # type: ignore

yield record

Expand Down
4 changes: 2 additions & 2 deletions databuilder/databuilder/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ def get_user_record(self) -> RDSModel:
# or the flag allows to update empty values
for attr, value in record_attr_map.items():
if value or not self.do_not_update_empty_attribute:
record.__setattr__(attr.key, value)
record.__setattr__(attr.key, value) # type: ignore

if self.manager_email:
record.manager_rk = self.get_user_model_key(email=self.manager_email)
record.manager_rk = self.get_user_model_key(email=self.manager_email) # type: ignore

return record

Expand Down
6 changes: 3 additions & 3 deletions databuilder/databuilder/publisher/mysql_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _sort_record_files(self, files: List[str]) -> List[str]:
:param files:
:return:
"""
sorted_table_names = [table.name for table in Base.metadata.sorted_tables]
sorted_table_names = [table.name for table in Base.metadata.sorted_tables] # type: ignore
return sorted(files, key=lambda file: sorted_table_names.index(self._get_table_name_from_file(file)))

def _get_table_name_from_file(self, file: str) -> str:
Expand Down Expand Up @@ -187,8 +187,8 @@ def _create_record(self, model: Type[RDSModel], record_dict: Dict) -> RDSModel:
:return:
"""
record = model(**record_dict)
record.published_tag = self._publish_tag
record.publisher_last_updated_epoch_ms = int(time.time() * 1000)
record.published_tag = self._publish_tag # type: ignore
record.publisher_last_updated_epoch_ms = int(time.time() * 1000) # type: ignore
return record

def _execute(self, session: Session) -> None:
Expand Down
12 changes: 6 additions & 6 deletions databuilder/databuilder/task/mysql_staleness_removal_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ class MySQLStalenessRemovalTask(Task):
MIN_MS_TO_EXPIRE = "minimum_milliseconds_to_expire"

_DEFAULT_CONFIG = ConfigFactory.from_dict({STALENESS_MAX_PCT: 5,
TARGET_TABLES: [],
STALENESS_PCT_MAX_DICT: {},
MIN_MS_TO_EXPIRE: 86400000,
DRY_RUN: False,
ENGINE_ECHO: False})
TARGET_TABLES: [],
STALENESS_PCT_MAX_DICT: {},
MIN_MS_TO_EXPIRE: 86400000,
DRY_RUN: False,
ENGINE_ECHO: False})

def get_scope(self) -> str:
return 'task.mysql_remove_stale_data'
Expand Down Expand Up @@ -121,7 +121,7 @@ def run(self) -> None:
referenced tables data which will be deleted in a cascade delete)
:return:
"""
sorted_table_names = [table.name for table in Base.metadata.sorted_tables]
sorted_table_names = [table.name for table in Base.metadata.sorted_tables] # type: ignore
sorted_target_tables = sorted(
self.target_tables, key=lambda table: sorted_table_names.index(table), reverse=True)
try:
Expand Down
3 changes: 2 additions & 1 deletion databuilder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ unicodecsv>=0.14.1,<1.0
httplib2>=0.18.0
text-unidecode>=1.3
Jinja2>=2.10.0,<4
pandas>=0.21.0,<1.5.0
pandas>=0.21.0,<=2.2.2
responses>=0.10.6
jsonref==0.2
numpy<2.0

amundsen-common>=0.16.0
amundsen-rds==0.0.8
5 changes: 2 additions & 3 deletions databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@
]

feast = [
'feast==0.17.0',
'fastapi!=0.76.*',
'protobuf<=3.20.1'
'feast==0.34.0',
'dask[dataframe]<=2024.5.0',
]

atlas = [
Expand Down
30 changes: 20 additions & 10 deletions databuilder/tests/unit/extractor/test_feast_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def test_feature_view_extraction(self) -> None:
ColumnMetadata(
"driver_id", "Internal identifier of the driver", "INT64", 0
),
ColumnMetadata("conv_rate", None, "FLOAT", 1),
ColumnMetadata("acc_rate", None, "FLOAT", 2),
ColumnMetadata("conv_rate", None, "FLOAT32", 1),
ColumnMetadata("acc_rate", None, "FLOAT32", 2),
ColumnMetadata("avg_daily_trips", None, "INT64", 3),
],
)
Expand Down Expand Up @@ -70,11 +70,12 @@ def test_feature_table_extraction_with_description_batch(self) -> None:
expected = DescriptionMetadata(
TestFeastExtractor._strip_margin(
f"""```
|name: "driver_hourly_stats_batch_source"
|type: BATCH_FILE
|event_timestamp_column: "event_timestamp"
|timestamp_field: "event_timestamp"
|created_timestamp_column: "created"
|file_options {"{"}
| file_url: "{root_tests_path}/resources/extractor/feast/fs/data/driver_stats.parquet"
| uri: "{root_tests_path}/resources/extractor/feast/fs/data/driver_stats.parquet"
|{"}"}
|```"""
),
Expand Down Expand Up @@ -107,11 +108,12 @@ def test_feature_table_extraction_with_description_stream(self) -> None:
expected = DescriptionMetadata(
TestFeastExtractor._strip_margin(
f"""```
|name: "driver_hourly_stats_batch_source"
|type: BATCH_FILE
|event_timestamp_column: "event_timestamp"
|timestamp_field: "event_timestamp"
|created_timestamp_column: "created"
|file_options {"{"}
| file_url: "{root_tests_path}/resources/extractor/feast/fs/data/driver_stats.parquet"
| uri: "{root_tests_path}/resources/extractor/feast/fs/data/driver_stats.parquet"
|{"}"}
|```"""
),
Expand All @@ -135,19 +137,27 @@ def test_feature_table_extraction_with_description_stream(self) -> None:
expected = DescriptionMetadata(
TestFeastExtractor._strip_margin(
"""```
|name: "driver_hourly_stats"
|type: STREAM_KAFKA
|event_timestamp_column: "datetime"
|created_timestamp_column: "datetime"
|timestamp_field: "datetime"
|batch_source {{
| name: "driver_hourly_stats_batch_source"
| type: BATCH_FILE
| timestamp_field: "event_timestamp"
| created_timestamp_column: "created"
| file_options {{
| uri: "{root_tests_path}/resources/extractor/feast/fs/data/driver_stats.parquet"
| }}
|}}
|kafka_options {{
| bootstrap_servers: "broker1"
| topic: "driver_hourly_stats"
| message_format {{
| avro_format {{
| schema_json: "{schema_json}"
| }}
| }}
|}}
|```""").format(schema_json=schema_json),
|```""").format(root_tests_path=root_tests_path, schema_json=schema_json),
"stream_source",
)
print(stream_source.description.__repr__())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ def test_table_search(self,
table.schema = schema

table.description = TableDescription(rk='test_table_description_key', description='test_table_description')
table.programmatic_descriptions = [TableProgrammaticDescription(rk='test_table_prog_description_key',
description='test_table_prog_description')]
table.programmatic_descriptions = [
TableProgrammaticDescription(rk='test_table_prog_description_key', # type: ignore
description='test_table_prog_description')]

table.timestamp = TableTimestamp(rk='test_table_timestamp_key', last_updated_timestamp=123456789)

Expand All @@ -64,17 +65,17 @@ def test_table_search(self,
description='test_col1_description')
column2.description = ColumnDescription(rk='test_col2_description_key',
description='test_col2_description')
table.columns = [column1, column2, column3]
table.columns = [column1, column2, column3] # type: ignore

usage1 = TableUsage(user_rk='test_user1_key', table_rk='test_table_key', read_count=5)
usage2 = TableUsage(user_rk='test_user2_key', table_rk='test_table_key', read_count=10)
table.usage = [usage1, usage2]
table.usage = [usage1, usage2] # type: ignore

tags = [Tag(rk='test_tag', tag_type='default')]
table.tags = tags
table.tags = tags # type: ignore

badges = [Badge(rk='test_badge')]
table.badges = badges
table.badges = badges # type: ignore

tables = [table]

Expand Down Expand Up @@ -198,21 +199,21 @@ def test_dashboard_search(self,
group.cluster = cluster

last_exec = DashboardExecution(rk='test_dashboard_exec_key/_last_successful_execution', timestamp=123456789)
dashboard.execution = [last_exec]
dashboard.execution = [last_exec] # type: ignore

usage1 = DashboardUsage(user_rk='test_user1_key', dashboard_rk='test_dashboard_key', read_count=10)
usage2 = DashboardUsage(user_rk='test_user2_key', dashboard_rk='test_dashboard_key', read_count=5)
dashboard.usage = [usage1, usage2]
dashboard.usage = [usage1, usage2] # type: ignore

query = DashboardQuery(rk='test_query_key', name='test_query')
query.charts = [DashboardChart(rk='test_chart_key', name='test_chart')]
dashboard.queries = [query]
query.charts = [DashboardChart(rk='test_chart_key', name='test_chart')] # type: ignore
dashboard.queries = [query] # type: ignore

tags = [Tag(rk='test_tag', tag_type='default')]
dashboard.tags = tags
dashboard.tags = tags # type: ignore

badges = [Badge(rk='test_badge')]
dashboard.badges = badges
dashboard.badges = badges # type: ignore

dashboards = [dashboard]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ project: fs
registry: data/registry.db
provider: local
online_store:
path: data/online_store.db
path: data/online_store.db
entity_key_serialization_version: 2
31 changes: 16 additions & 15 deletions databuilder/tests/unit/resources/extractor/feast/fs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,31 @@

import pathlib
import re
from datetime import datetime
from datetime import datetime, timedelta

from feast import (
Entity, Feature, FeatureView, FileSource, KafkaSource, ValueType,
Entity, FeatureView, Field, FileSource, KafkaSource, ValueType,
)
from feast.data_format import AvroFormat
from google.protobuf.duration_pb2 import Duration
from feast.types import Float32, Int64

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.

root_path = pathlib.Path(__file__).parent.resolve()
driver_hourly_stats = FileSource(
name="driver_hourly_stats_batch_source",
path=f"{root_path}/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_hourly_stats_kafka_source = KafkaSource(
bootstrap_servers="broker1",
event_timestamp_column="datetime",
created_timestamp_column="datetime",
name="driver_hourly_stats",
timestamp_field="datetime",
topic="driver_hourly_stats",
batch_source=driver_hourly_stats,
message_format=AvroFormat(
schema_json=re.sub(
"\n[ \t]*\\|",
Expand All @@ -46,6 +47,7 @@
# fetch features.
driver = Entity(
name="driver_id",
join_keys=["driver_id"],
value_type=ValueType.INT64,
description="Internal identifier of the driver",
)
Expand All @@ -55,16 +57,15 @@
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
entities=[driver],
ttl=timedelta(seconds=8640000000),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
stream_source=driver_hourly_stats_kafka_source,
batch_source=driver_hourly_stats,
source=driver_hourly_stats_kafka_source,
tags={"is_pii": "true"},
)

Expand Down

0 comments on commit 74212e2

Please sign in to comment.