Skip to content

Commit eda0e75

Browse files
authored
Mechanics for backfilling missing BigQuery metadata (#17429)
* Add MissingDatasetFile * Add processed column to MissingDatasetFile * Modify sync_bigquery_release_files to use MissingDatasetFile * Run the task every minute * Make processed column nullable and default to null * Only process unprocessed files * Run every 5 minutes instead * Fix a bug * Give tests access to the transaction manager * Commit early via the transaction manager * Add the objects back into the session * Squash migrations
1 parent 5ae94d8 commit eda0e75

File tree

7 files changed

+152
-157
lines changed

7 files changed

+152
-157
lines changed

tests/conftest.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -668,8 +668,9 @@ def query_recorder(app_config):
668668

669669

670670
@pytest.fixture
671-
def db_request(pyramid_request, db_session):
671+
def db_request(pyramid_request, db_session, tm):
672672
pyramid_request.db = db_session
673+
pyramid_request.tm = tm
673674
pyramid_request.flags = admin.flags.Flags(pyramid_request)
674675
pyramid_request.banned = admin.bans.Bans(pyramid_request)
675676
pyramid_request.organization_access = True
@@ -734,7 +735,6 @@ def tm():
734735
# Create a new transaction manager for dependant test cases
735736
tm = transaction.TransactionManager(explicit=True)
736737
tm.begin()
737-
tm.doom()
738738

739739
yield tm
740740

tests/unit/packaging/test_init.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
)
2828
from warehouse.packaging.models import AlternateRepository, File, Project, Release, Role
2929
from warehouse.packaging.services import project_service_factory
30-
from warehouse.packaging.tasks import ( # sync_bigquery_release_files,
30+
from warehouse.packaging.tasks import (
3131
check_file_cache_tasks_outstanding,
32+
sync_bigquery_release_files,
3233
update_description_html,
3334
)
3435
from warehouse.rate_limiting import IRateLimiter, RateLimit
@@ -169,10 +170,10 @@ def key_factory(keystring, iterate_on=None, if_attr_exists=None):
169170
]
170171

171172
if with_bq_sync:
172-
# assert (
173-
# pretend.call(crontab(minute=0), sync_bigquery_release_files)
174-
# in config.add_periodic_task.calls
175-
# )
173+
assert (
174+
pretend.call(crontab(minute="*/5"), sync_bigquery_release_files)
175+
in config.add_periodic_task.calls
176+
)
176177
pass
177178

178179
assert (

tests/unit/packaging/test_tasks.py

+9-60
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import tempfile
1414

1515
from contextlib import contextmanager
16-
from itertools import product
1716

1817
import pretend
1918
import pytest
@@ -24,7 +23,7 @@
2423
import warehouse.packaging.tasks
2524

2625
from warehouse.accounts.models import WebAuthn
27-
from warehouse.packaging.models import Description
26+
from warehouse.packaging.models import Description, MissingDatasetFile
2827
from warehouse.packaging.tasks import (
2928
check_file_cache_tasks_outstanding,
3029
compute_2fa_metrics,
@@ -734,19 +733,18 @@ def test_sync_rows(
734733
DependencyFactory.create(release=release, kind=2)
735734
DependencyFactory.create(release=release, kind=3)
736735
DependencyFactory.create(release=release, kind=4)
737-
load_config = pretend.call_recorder(lambda *a, **kw: None)
738-
monkeypatch.setattr("warehouse.packaging.tasks.LoadJobConfig", load_config)
736+
missing = MissingDatasetFile(file_id=release_file.id)
737+
db_request.db.add(missing)
739738

740739
query = pretend.stub(
741740
result=pretend.call_recorder(
742741
lambda *a, **kw: [{"md5_digest": release_file2.md5_digest}]
743742
)
744743
)
745744
get_table = pretend.stub(schema=bq_schema)
746-
get_result = pretend.stub(result=lambda: None)
747745
bigquery = pretend.stub(
748746
get_table=pretend.call_recorder(lambda t: get_table),
749-
load_table_from_json=pretend.call_recorder(lambda *a, **kw: get_result),
747+
insert_rows_json=pretend.call_recorder(lambda *a, **kw: None),
750748
query=pretend.call_recorder(lambda q: query),
751749
)
752750

@@ -765,17 +763,11 @@ def find_service(name=None):
765763

766764
assert db_request.find_service.calls == [pretend.call(name="gcloud.bigquery")]
767765
assert bigquery.get_table.calls == expected_get_table_calls
768-
assert bigquery.query.calls == [
769-
pretend.call(query.format(table))
770-
for table in release_files_table.split()
771-
for query in [
772-
"SELECT md5_digest FROM {} WHERE md5_digest LIKE 'ff%'",
773-
"SELECT md5_digest FROM {} WHERE md5_digest LIKE 'fe%'",
774-
]
775-
]
776-
assert bigquery.load_table_from_json.calls == [
766+
assert bigquery.query.calls == []
767+
assert bigquery.insert_rows_json.calls == [
777768
pretend.call(
778-
[
769+
table=table,
770+
json_rows=[
779771
{
780772
"metadata_version": None,
781773
"name": project.name,
@@ -818,54 +810,11 @@ def find_service(name=None):
818810
"blake2_256_digest": release_file.blake2_256_digest,
819811
},
820812
],
821-
table,
822-
job_config=None,
823813
)
824814
for table in release_files_table.split()
825815
]
826816

827-
@pytest.mark.parametrize("bq_schema", [bq_schema])
828-
def test_no_diff(self, db_request, monkeypatch, bq_schema):
829-
project = ProjectFactory.create()
830-
release = ReleaseFactory.create(project=project)
831-
release_file = FileFactory.create(
832-
release=release, filename=f"foobar-{release.version}.tar.gz"
833-
)
834-
835-
query = pretend.stub(
836-
result=pretend.call_recorder(
837-
lambda *a, **kw: [{"md5_digest": release_file.md5_digest}]
838-
)
839-
)
840-
get_table = pretend.stub(schema=bq_schema)
841-
bigquery = pretend.stub(
842-
get_table=pretend.call_recorder(lambda t: get_table),
843-
query=pretend.call_recorder(lambda q: query),
844-
)
845-
846-
@pretend.call_recorder
847-
def find_service(name=None):
848-
if name == "gcloud.bigquery":
849-
return bigquery
850-
raise LookupError
851-
852-
db_request.find_service = find_service
853-
db_request.registry.settings = {
854-
"warehouse.release_files_table": "example.pypi.distributions"
855-
}
856-
857-
sync_bigquery_release_files(db_request)
858-
859-
assert db_request.find_service.calls == [pretend.call(name="gcloud.bigquery")]
860-
assert bigquery.get_table.calls == [pretend.call("example.pypi.distributions")]
861-
assert bigquery.query.calls == [
862-
pretend.call(
863-
"SELECT md5_digest "
864-
"FROM example.pypi.distributions "
865-
f"WHERE md5_digest LIKE '{first}{second}%'",
866-
)
867-
for first, second in product("fedcba9876543210", repeat=2)
868-
]
817+
assert missing.processed
869818

870819
def test_var_is_none(self):
871820
request = pretend.stub(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
"""
13+
Add MissingDatasetFile
14+
15+
Revision ID: 77d52a945a5f
16+
Revises: 12a43f12cc18
17+
Create Date: 2025-01-17 16:56:09.082853
18+
"""
19+
20+
import sqlalchemy as sa
21+
22+
from alembic import op
23+
24+
revision = "77d52a945a5f"
25+
down_revision = "12a43f12cc18"
26+
27+
28+
def upgrade():
29+
op.create_table(
30+
"missing_dataset_files",
31+
sa.Column("file_id", sa.UUID(), nullable=False),
32+
sa.Column("processed", sa.Boolean(), nullable=True),
33+
sa.Column(
34+
"id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False
35+
),
36+
sa.ForeignKeyConstraint(
37+
["file_id"],
38+
["release_files.id"],
39+
),
40+
sa.PrimaryKeyConstraint("id"),
41+
)
42+
43+
44+
def downgrade():
45+
op.drop_table("missing_dataset_files")

warehouse/packaging/__init__.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
check_file_cache_tasks_outstanding,
3131
compute_2fa_metrics,
3232
compute_packaging_metrics,
33+
sync_bigquery_release_files,
3334
update_description_html,
3435
)
3536
from warehouse.rate_limiting import IRateLimiter, RateLimit
@@ -197,6 +198,5 @@ def includeme(config):
197198
# Add a periodic task to generate general metrics
198199
config.add_periodic_task(crontab(minute="*/5"), compute_packaging_metrics)
199200

200-
# TODO: restore this
201-
# if config.get_settings().get("warehouse.release_files_table"):
202-
# config.add_periodic_task(crontab(minute=0), sync_bigquery_release_files)
201+
if config.get_settings().get("warehouse.release_files_table"):
202+
config.add_periodic_task(crontab(minute="*/5"), sync_bigquery_release_files)

warehouse/packaging/models.py

+8
Original file line numberDiff line numberDiff line change
@@ -1133,3 +1133,11 @@ class AlternateRepository(db.Model):
11331133
name: Mapped[str]
11341134
url: Mapped[str]
11351135
description: Mapped[str]
1136+
1137+
1138+
class MissingDatasetFile(db.Model):
1139+
__tablename__ = "missing_dataset_files"
1140+
1141+
file_id: Mapped[UUID] = mapped_column(ForeignKey("release_files.id"))
1142+
file: Mapped[File] = orm.relationship()
1143+
processed: Mapped[bool] = mapped_column(default=None, nullable=True)

0 commit comments

Comments
 (0)