Skip to content
This repository has been archived by the owner on Jan 4, 2023. It is now read-only.

Add number of extracted, loaded, and failed resources in batch responses #693

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions django/river/api/serializers/serializers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import dataclasses
from typing import Dict, List, Tuple

from rest_framework import serializers

from pyrog import models as pyrog_models
from river import models
from river.adapters.progression_counter import RedisProgressionCounter


class ErrorSerializer(serializers.ModelSerializer):
Expand All @@ -9,8 +14,24 @@ class Meta:
fields = "__all__"


class ResourceForProgressionSerializer(serializers.ModelSerializer):
class Meta:
model = pyrog_models.Resource
fields = ["id", "definition_id", "label"]


class ProgressionSerializer(serializers.ModelSerializer):
resource = ResourceForProgressionSerializer(read_only=True)

class Meta:
model = models.Progression
fields = ["resource", "extracted", "loaded", "failed"]


class BatchSerializer(serializers.ModelSerializer):
# FIXME errors is a property of BaseSerializer, we shouldn't override it
errors = ErrorSerializer(many=True, read_only=True)
progressions = ProgressionSerializer(many=True, read_only=True)

class Meta:
model = models.Batch
Expand All @@ -21,6 +42,40 @@ class Meta:
"completed_at": {"allow_null": True},
Jasopaum marked this conversation as resolved.
Show resolved Hide resolved
}

def get_progressions(self, obj) -> List[Tuple[str, Dict]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not used anymore: you use ProgressionSerializer and ResourceForProgressionSerializer instead

"""
Fetch the number of extracted and loaded resources from redis.
Returns a list of lists that looks like:
[
["Patient", {"extracted": 100, "loaded": 20, "failed": 3}],
["Practitioner (nurse)", {"extracted": 200, "loaded": 10, "failed": None}],
]
"""
counter = RedisProgressionCounter()

def resource_name_with_label(resource):
return f"{resource.definition_id}{f' ({resource.label})' if resource.label else ''}"

# If batch is over, the counter won't necessarily be in redis
if models.Progression.objects.filter(batch=obj):
return [
(
resource_name_with_label(progression.resource),
{"extracted": progression.extracted, "loaded": progression.loaded, "failed": progression.failed},
)
for progression in models.Progression.objects.filter(batch=obj)
]

progressions = [
[
resource_name_with_label(resource),
dataclasses.asdict(counter.get(f"{obj.id}:{resource.id}")),
]
for resource in obj.resources.all()
]

return progressions


class PreviewRequestSerializer(serializers.Serializer):
resource_id = serializers.CharField()
Expand Down
10 changes: 9 additions & 1 deletion django/river/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyrog.api.serializers.mapping import MappingSerializer
from river import models as river_models
from river.adapters.event_publisher import KafkaEventPublisher
from river.adapters.progression_counter import RedisProgressionCounter
from river.adapters.topics import KafkaTopicsManager
from river.api import filters
from river.api.serializers import serializers
Expand Down Expand Up @@ -44,6 +45,12 @@ def create(self, request, *args, **kwargs):

batch_instance = serializer.save()

# Create Progressions
for resource in resources:
river_models.Progression.objects.create(
batch=batch_instance, resource=resource, extracted=None, loaded=None, failed=None
)

topics_manager = KafkaTopicsManager()
event_publisher = KafkaEventPublisher()

Expand All @@ -55,7 +62,8 @@ def destroy(self, request, *args, **kwargs):
batch_instance = self.get_object()

topics_manager = KafkaTopicsManager()
abort(batch_instance, topics_manager)
redis_counter = RedisProgressionCounter()
abort(batch_instance, topics_manager, redis_counter)

return response.Response(status=status.HTTP_204_NO_CONTENT)

Expand Down
39 changes: 39 additions & 0 deletions django/river/migrations/0009_progression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Generated by Django 3.1.12 on 2021-10-26 09:49

import django.db.models.deletion
from django.db import migrations, models

import cuid


class Migration(migrations.Migration):

dependencies = [
("river", "0008_batch_resources"),
]

operations = [
migrations.CreateModel(
name="Progression",
fields=[
("id", models.TextField(default=cuid.cuid, editable=False, primary_key=True, serialize=False)),
("extracted", models.IntegerField(null=True)),
("loaded", models.IntegerField(null=True)),
("failed", models.IntegerField(null=True)),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
(
"batch",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, related_name="progressions", to="river.batch"
),
),
(
"resource",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, related_name="progressions", to="pyrog.resource"
),
),
],
),
]
11 changes: 11 additions & 0 deletions django/river/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ class Error(models.Model):
event = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)


class Progression(models.Model):
id = models.TextField(primary_key=True, default=cuid, editable=False)
batch = models.ForeignKey(Batch, related_name="progressions", on_delete=models.CASCADE)
resource = models.ForeignKey(pyrog_models.Resource, related_name="progressions", on_delete=models.CASCADE)
extracted = models.IntegerField(null=True)
loaded = models.IntegerField(null=True)
failed = models.IntegerField(null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
23 changes: 21 additions & 2 deletions django/river/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from typing import Any, List, Optional, Tuple

from django.utils import timezone
Expand All @@ -7,6 +8,7 @@
from pyrog.models import Resource
from river import models
from river.adapters.event_publisher import EventPublisher
from river.adapters.progression_counter import ProgressionCounter
from river.adapters.topics import TopicsManager
from river.common.analyzer import Analyzer
from river.common.database_connection.db_connection import DBConnection
Expand All @@ -15,6 +17,8 @@
from river.transformer.transformer import Transformer
from utils.json import CustomJSONEncoder

logger = logging.getLogger(__name__)


def batch(
batch_id: str,
Expand All @@ -32,12 +36,27 @@ def batch(
)


def abort(batch: models.Batch, topics_manager: TopicsManager) -> None:
def abort(batch: models.Batch, topics_manager: TopicsManager, counter: ProgressionCounter) -> None:
for base_topic in ["batch", "extract", "transform", "load"]:
topics_manager.delete(f"{base_topic}.{batch.id}")

# Update Progressions in DB
for resource in batch.resources.all():
redis_progression = counter.get(f"{batch.id}:{resource.id}")
if not redis_progression:
continue
try:
resource_progression = models.Progression.objects.get(batch=batch, resource=resource)
except models.Progression.DoesNotExist:
logger.warning(f"Could not find progression of resource {resource} in batch {batch}")
continue
resource_progression.extracted = redis_progression.extracted
resource_progression.loaded = redis_progression.loaded
resource_progression.failed = redis_progression.failed
resource_progression.save()

batch.canceled_at = timezone.now()
batch.save(update_fields=["canceled_at"])
batch.save()


def retry(batch: models.Batch) -> None:
Expand Down
41 changes: 27 additions & 14 deletions django/river/topicleaner/service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
from time import sleep
from typing import List

from django.utils import timezone

from river import models
from river.adapters.progression_counter import ProgressionCounter
from river.adapters.topics import TopicsManager
from river.models import Batch
Expand All @@ -16,36 +16,49 @@ def teardown_after_batch(batch: Batch, topics: TopicsManager):
topics.delete(f"{base_topic}.{batch.id}")


def clean(counter: ProgressionCounter, topics: TopicsManager):
def task(counter: ProgressionCounter, topics: TopicsManager):
current_batches = Batch.objects.filter(completed_at__isnull=True, canceled_at__isnull=True).prefetch_related(
"resources"
)
batches_to_delete: List[Batch] = []

for batch in current_batches:
resources_progressions = [counter.get(f"{batch.id}:{resource.id}") for resource in batch.resources.all()]
resources_progressions = {
resource: counter.get(f"{batch.id}:{resource.id}") for resource in batch.resources.all()
}

# Update Progressions in DB
for resource, redis_progression in resources_progressions.items():
if not redis_progression:
continue
try:
resource_progression = models.Progression.objects.get(batch=batch, resource=resource)
except models.Progression.DoesNotExist:
logger.warning(f"Could not find progression of resource {resource} in batch {batch}")
continue
resource_progression.extracted = redis_progression.extracted
resource_progression.loaded = redis_progression.loaded
resource_progression.failed = redis_progression.failed
resource_progression.save()

# Clear if needed
if all(
[
progression is not None
and progression.extracted is not None
and ((progression.loaded or 0) + (progression.failed or 0)) >= progression.extracted
for progression in resources_progressions
for progression in resources_progressions.values()
]
):
batches_to_delete.append(batch)
logger.info(f"Deleting batch {batch}.")

if batches_to_delete:
logger.info(f"Deleting batches: {batches_to_delete}.")
teardown_after_batch(batch, topics)
batch.completed_at = timezone.now()
batch.save()

for batch in batches_to_delete:
teardown_after_batch(batch, topics)
batch.completed_at = timezone.now()
batch.save()
logger.info(f"Batch {batch} deleted.")
logger.info(f"Batch {batch} deleted.")


def run(counter: ProgressionCounter, topics: TopicsManager):
while True:
clean(counter=counter, topics=topics)
task(counter=counter, topics=topics)
sleep(10)
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ django-filter==2.4.0
django-revproxy==0.10.0
djangorestframework==3.12.4
dotty-dict==1.2.1
drf-spectacular==0.14.0
drf-spectacular==0.17.3
hiredis==1.1.0
jsonschema==3.0.2
mozilla-django-oidc==1.2.4
Expand Down
Loading