Skip to content
This repository was archived by the owner on Jan 4, 2023. It is now read-only.

Commit b6981ee

Browse files
committed
feat(river): persist progressions with topicleaner
1 parent ad84efc commit b6981ee

File tree

9 files changed

+121
-51
lines changed

9 files changed

+121
-51
lines changed

django/river/api/serializers/serializers.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from rest_framework import serializers
55

6+
from pyrog import models as pyrog_models
67
from river import models
78
from river.adapters.progression_counter import RedisProgressionCounter
89

@@ -13,9 +14,24 @@ class Meta:
1314
fields = "__all__"
1415

1516

17+
class ResourceForProgressionSerializer(serializers.ModelSerializer):
18+
class Meta:
19+
model = pyrog_models.Resource
20+
fields = ["id", "definition_id", "label"]
21+
22+
23+
class ProgressionSerializer(serializers.ModelSerializer):
24+
resource = ResourceForProgressionSerializer(read_only=True)
25+
26+
class Meta:
27+
model = models.Progression
28+
fields = ["resource", "extracted", "loaded", "failed"]
29+
30+
1631
class BatchSerializer(serializers.ModelSerializer):
32+
# FIXME errors is a property of BaseSerializer, we shouldn't override it
1733
errors = ErrorSerializer(many=True, read_only=True)
18-
progressions = serializers.SerializerMethodField()
34+
progressions = ProgressionSerializer(many=True, read_only=True)
1935

2036
class Meta:
2137
model = models.Batch

django/river/api/views.py

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ def create(self, request, *args, **kwargs):
4545

4646
batch_instance = serializer.save()
4747

48+
# Create Progressions
49+
for resource in resources:
50+
river_models.Progression.objects.create(
51+
batch=batch_instance, resource=resource, extracted=None, loaded=None, failed=None
52+
)
53+
4854
topics_manager = KafkaTopicsManager()
4955
event_publisher = KafkaEventPublisher()
5056

django/river/services.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
from typing import Any, List, Optional, Tuple
34

45
from django.utils import timezone
@@ -16,6 +17,8 @@
1617
from river.transformer.transformer import Transformer
1718
from utils.json import CustomJSONEncoder
1819

20+
logger = logging.getLogger(__name__)
21+
1922

2023
def batch(
2124
batch_id: str,
@@ -37,18 +40,20 @@ def abort(batch: models.Batch, topics_manager: TopicsManager, counter: Progressi
3740
for base_topic in ["batch", "extract", "transform", "load"]:
3841
topics_manager.delete(f"{base_topic}.{batch.id}")
3942

40-
# Persist progressions in DB
43+
# Update Progressions in DB
4144
for resource in batch.resources.all():
42-
resource_progression = counter.get(f"{batch.id}:{resource.id}")
43-
if not resource_progression:
45+
redis_progression = counter.get(f"{batch.id}:{resource.id}")
46+
if not redis_progression:
4447
continue
45-
models.Progression.objects.create(
46-
batch=batch,
47-
resource=resource,
48-
extracted=resource_progression.extracted,
49-
loaded=resource_progression.loaded,
50-
failed=resource_progression.failed,
51-
)
48+
try:
49+
resource_progression = models.Progression.objects.get(batch=batch, resource=resource)
50+
except models.Progression.DoesNotExist:
51+
logger.warning(f"Could not find progression of resource {resource} in batch {batch}")
52+
continue
53+
resource_progression.extracted = redis_progression.extracted
54+
resource_progression.loaded = redis_progression.loaded
55+
resource_progression.failed = redis_progression.failed
56+
resource_progression.save()
5257

5358
batch.canceled_at = timezone.now()
5459
batch.save()

django/river/topicleaner/service.py

+17-13
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def teardown_after_batch(batch: Batch, topics: TopicsManager):
1616
topics.delete(f"{base_topic}.{batch.id}")
1717

1818

19-
def clean(counter: ProgressionCounter, topics: TopicsManager):
19+
def task(counter: ProgressionCounter, topics: TopicsManager):
2020
current_batches = Batch.objects.filter(completed_at__isnull=True, canceled_at__isnull=True).prefetch_related(
2121
"resources"
2222
)
@@ -26,6 +26,21 @@ def clean(counter: ProgressionCounter, topics: TopicsManager):
2626
resource: counter.get(f"{batch.id}:{resource.id}") for resource in batch.resources.all()
2727
}
2828

29+
# Update Progressions in DB
30+
for resource, redis_progression in resources_progressions.items():
31+
if not redis_progression:
32+
continue
33+
try:
34+
resource_progression = models.Progression.objects.get(batch=batch, resource=resource)
35+
except models.Progression.DoesNotExist:
36+
logger.warning(f"Could not find progression of resource {resource} in batch {batch}")
37+
continue
38+
resource_progression.extracted = redis_progression.extracted
39+
resource_progression.loaded = redis_progression.loaded
40+
resource_progression.failed = redis_progression.failed
41+
resource_progression.save()
42+
43+
# Clear if needed
2944
if all(
3045
[
3146
progression is not None
@@ -36,17 +51,6 @@ def clean(counter: ProgressionCounter, topics: TopicsManager):
3651
):
3752
logger.info(f"Deleting batch {batch}.")
3853

39-
for resource, progression in resources_progressions.items():
40-
if not progression:
41-
continue
42-
models.Progression.objects.create(
43-
batch=batch,
44-
resource=resource,
45-
extracted=progression.extracted,
46-
loaded=progression.loaded,
47-
failed=progression.failed,
48-
)
49-
5054
teardown_after_batch(batch, topics)
5155
batch.completed_at = timezone.now()
5256
batch.save()
@@ -56,5 +60,5 @@ def clean(counter: ProgressionCounter, topics: TopicsManager):
5660

5761
def run(counter: ProgressionCounter, topics: TopicsManager):
5862
while True:
59-
clean(counter=counter, topics=topics)
63+
task(counter=counter, topics=topics)
6064
sleep(10)

tests/river/conftest.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
register(factories.BatchFactory)
2323
register(factories.ErrorFactory)
24+
register(factories.ProgressionFactory)
2425
register(ResourceFactory)
2526
register(SourceFactory)
2627

tests/river/e2e/api/test_batch.py

+31-7
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@ def test_list_batch(api_client, batch_factory):
5858

5959

6060
@pytest.mark.redis
61-
def test_get_batch_progression(api_client, batch_factory, resource_factory, redis_client):
61+
def test_get_batch_progression(api_client, resource_factory, batch_factory, progression_factory, redis_client):
6262
url = reverse("batches-list")
6363
r1 = resource_factory.create(definition_id="Patient")
6464
r2 = resource_factory.create(definition_id="Practitioner")
6565
batch = batch_factory.create(resources=[r1, r2])
66+
progression_factory.create(batch=batch, resource=r1)
67+
progression_factory.create(batch=batch, resource=r2)
6668

6769
set_counters(redis_client, batch, r1, r2)
6870

@@ -72,8 +74,18 @@ def test_get_batch_progression(api_client, batch_factory, resource_factory, redi
7274
assert len(response.data) == 1
7375
batch_response = response.json()[0]
7476
assert batch_response["progressions"] == [
75-
["Patient", {"extracted": 10, "loaded": 5, "failed": None}],
76-
["Practitioner", {"extracted": 20, "loaded": 5, "failed": 3}],
77+
{
78+
"resource": {"id": r1.id, "definition_id": "Patient", "label": ""},
79+
"extracted": 100,
80+
"loaded": 50,
81+
"failed": None,
82+
},
83+
{
84+
"resource": {"id": r2.id, "definition_id": "Practitioner", "label": ""},
85+
"extracted": 100,
86+
"loaded": 50,
87+
"failed": None,
88+
},
7789
]
7890

7991

@@ -106,10 +118,12 @@ def test_retrieve_batch(api_client, batch_factory, resource_factory):
106118

107119

108120
@pytest.mark.redis
109-
def test_delete_batch(api_client, redis_client, batch_factory, resource_factory, kafka_admin):
121+
def test_delete_batch(api_client, redis_client, resource_factory, batch_factory, progression_factory, kafka_admin):
110122
r1 = resource_factory.create(definition_id="Patient")
111123
r2 = resource_factory.create(definition_id="Practitioner")
112124
batch = batch_factory.create(resources=[r1, r2])
125+
progression_factory.create(batch=batch, resource=r1)
126+
progression_factory.create(batch=batch, resource=r2)
113127
url = reverse("batches-detail", kwargs={"pk": batch.id})
114128

115129
set_counters(redis_client, batch, r1, r2)
@@ -120,11 +134,21 @@ def test_delete_batch(api_client, redis_client, batch_factory, resource_factory,
120134
clear_counters(redis_client, batch, r1, r2)
121135

122136
response_get = api_client.get(url)
123-
print(response_get.json())
124137
assert response_get.json()["canceled_at"] is not None
138+
print(response_get.json()["progressions"])
125139
assert response_get.json()["progressions"] == [
126-
["Patient", {"extracted": 10, "loaded": 5, "failed": None}],
127-
["Practitioner", {"extracted": 20, "loaded": 5, "failed": 3}],
140+
{
141+
"resource": {"id": r1.id, "definition_id": "Patient", "label": ""},
142+
"extracted": 10,
143+
"loaded": 5,
144+
"failed": None,
145+
},
146+
{
147+
"resource": {"id": r2.id, "definition_id": "Practitioner", "label": ""},
148+
"extracted": 20,
149+
"loaded": 5,
150+
"failed": 3,
151+
},
128152
]
129153

130154
# Check that topics are deleted

tests/river/factories.py

+10
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,13 @@ class Meta:
3737

3838
id = factory.Sequence(lambda n: f"batch_id_{n:04d}")
3939
batch = factory.SubFactory(BatchFactory)
40+
41+
42+
class ProgressionFactory(factory.django.DjangoModelFactory):
43+
class Meta:
44+
model = "river.Progression"
45+
46+
id = factory.Sequence(lambda n: f"progression_id_{n:04d}")
47+
extracted = 100
48+
loaded = 50
49+
failed = None

tests/river/unit/test_services.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ def test_batch(batch_factory, resource_factory):
2525
]
2626

2727

28-
def test_abort(batch_factory, resource_factory):
28+
def test_abort(resource_factory, batch_factory, progression_factory):
2929
r1 = resource_factory.create(definition_id="Patient")
3030
r2 = resource_factory.create(definition_id="Practitioner")
3131
batch = batch_factory.create(resources=[r1, r2])
32+
progression_factory.create(batch=batch, resource=r1)
33+
progression_factory.create(batch=batch, resource=r2)
3234

3335
topics = InMemoryTopicsManager(
3436
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
@@ -43,16 +45,14 @@ def test_abort(batch_factory, resource_factory):
4345

4446
assert topics._topics == set()
4547
assert batch.canceled_at is not None
46-
r1_progressions = models.Progression.objects.filter(batch=batch, resource=r1)
47-
assert len(r1_progressions) == 1
48-
assert r1_progressions[0].extracted == 100
49-
assert r1_progressions[0].loaded == 20
50-
assert r1_progressions[0].failed == 3
51-
r2_progressions = models.Progression.objects.filter(batch=batch, resource=r2)
52-
assert len(r2_progressions) == 1
53-
assert r2_progressions[0].extracted == 200
54-
assert r2_progressions[0].loaded == 10
55-
assert r2_progressions[0].failed is None
48+
r1_progressions = models.Progression.objects.get(batch=batch, resource=r1)
49+
assert r1_progressions.extracted == 100
50+
assert r1_progressions.loaded == 20
51+
assert r1_progressions.failed == 3
52+
r2_progressions = models.Progression.objects.get(batch=batch, resource=r2)
53+
assert r2_progressions.extracted == 200
54+
assert r2_progressions.loaded == 10
55+
assert r2_progressions.failed is None
5656

5757

5858
@pytest.mark.skip(reason="feature not implemented yet")

tests/river/unit/topicleaner/test_service.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,24 @@
33
from river import models
44
from river.adapters.progression_counter import InMemoryProgressionCounter
55
from river.adapters.topics import InMemoryTopicsManager
6-
from river.topicleaner.service import clean
6+
from river.topicleaner.service import task as topicleaner_task
77

88
pytestmark = pytest.mark.django_db
99

1010

11-
def test_done_batch_is_cleaned(batch_factory, resource_factory):
11+
def test_done_batch_is_cleaned(resource_factory, batch_factory, progression_factory):
1212
r1, r2 = resource_factory.create_batch(2)
1313
batch = batch_factory.create(resources=[r1, r2])
14+
progression_factory.create(batch=batch, resource=r1)
15+
progression_factory.create(batch=batch, resource=r2)
1416
counters = InMemoryProgressionCounter(
1517
counts={f"{batch.id}:{resource.id}": {"extracted": 10, "loaded": 10} for resource in batch.resources.all()}
1618
)
1719
topics = InMemoryTopicsManager(
1820
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
1921
)
2022

21-
clean(counters, topics)
23+
topicleaner_task(counters, topics)
2224

2325
assert topics._topics == set()
2426
batches = models.Batch.objects.all()
@@ -30,9 +32,11 @@ def test_done_batch_is_cleaned(batch_factory, resource_factory):
3032
assert progression.failed is None
3133

3234

33-
def test_done_batch_is_cleaned_with_failed(batch_factory, resource_factory):
35+
def test_done_batch_is_cleaned_with_failed(resource_factory, batch_factory, progression_factory):
3436
r1, r2 = resource_factory.create_batch(2)
3537
batch = batch_factory.create(resources=[r1, r2])
38+
progression_factory.create(batch=batch, resource=r1)
39+
progression_factory.create(batch=batch, resource=r2)
3640
counters = InMemoryProgressionCounter(
3741
counts={
3842
f"{batch.id}:{resource.id}": {"extracted": 10, "loaded": 6, "failed": 4}
@@ -42,7 +46,7 @@ def test_done_batch_is_cleaned_with_failed(batch_factory, resource_factory):
4246
topics = InMemoryTopicsManager(
4347
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
4448
)
45-
clean(counters, topics)
49+
topicleaner_task(counters, topics)
4650

4751
assert topics._topics == set()
4852
batches = models.Batch.objects.all()
@@ -64,7 +68,7 @@ def test_ongoing_batch_is_not_cleaned(batch_factory, resource_factory):
6468
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
6569
)
6670

67-
clean(counters, topics)
71+
topicleaner_task(counters, topics)
6872

6973
assert topics._topics != set()
7074

@@ -82,7 +86,7 @@ def test_ongoing_batch_is_not_cleaned_with_failed(batch_factory, resource_factor
8286
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
8387
)
8488

85-
clean(counters, topics)
89+
topicleaner_task(counters, topics)
8690

8791
assert topics._topics != set()
8892

@@ -97,7 +101,7 @@ def test_none_counter_prevents_cleaning(batch_factory, resource_factory):
97101
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
98102
)
99103

100-
clean(counters, topics)
104+
topicleaner_task(counters, topics)
101105

102106
assert topics._topics != set()
103107

@@ -112,6 +116,6 @@ def test_missing_counter_prevents_cleaning(batch_factory, resource_factory):
112116
topics=[f"{base_topic}.{batch.id}" for base_topic in ["batch", "extract", "transform", "load"]]
113117
)
114118

115-
clean(counters, topics)
119+
topicleaner_task(counters, topics)
116120

117121
assert topics._topics != set()

0 commit comments

Comments
 (0)