diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5820525..9b32cb9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches-ignore: - 'gh-pages' - + jobs: build: runs-on: ubuntu-latest @@ -21,10 +21,11 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies && generate data for Osmose + - name: Install dependencies && generate data for Osmose run: | - ./install.sh + ./install.sh export PATH=$PATH:$PWD/ + python -m unittest group_opendata_by_station.py -v xsv --version mkdir output wget https://www.data.gouv.fr/fr/datasets/r/2729b192-40ab-4454-904d-735084dca3a3 --no-verbose --output-document=opendata_irve.csv 2>&1 diff --git a/group_opendata_by_station.py b/group_opendata_by_station.py index 22778fe..7f29e9d 100644 --- a/group_opendata_by_station.py +++ b/group_opendata_by_station.py @@ -3,19 +3,37 @@ import csv import re +import argparse from collections import Counter +from enum import IntFlag, auto + +class Socket(IntFlag): + EF = auto() + T2 = auto() + CHADEMO = auto() + CCS = auto() + +MAX_POWER_KW = { + Socket.EF: 4, + Socket.T2: 43, + Socket.CHADEMO: 63 +} station_list = {} station_attributes = [ 'nom_amenageur', 'siren_amenageur', 'contact_amenageur', 'nom_operateur', 'contact_operateur', 'telephone_operateur', 'nom_enseigne', 'id_station_itinerance', 'id_station_local', 'nom_station', 'implantation_station', 'code_insee_commune', 'nbre_pdc', 'station_deux_roues', 'raccordement', 'num_pdl', 'date_mise_en_service', 'observations', 'adresse_station' ] pdc_attributes = [ 'id_pdc_itinerance', 'id_pdc_local', 'puissance_nominale', 'prise_type_ef', 'prise_type_2', 'prise_type_combo_ccs', 'prise_type_chademo', 'prise_type_autre', 'gratuit', 'paiement_acte', 'paiement_cb', 'paiement_autre', 'tarification', 'condition_acces', 'reservation', 'accessibilite_pmr', 'restriction_gabarit', 'observations', 'date_maj', 'cable_t2_attache', 'datagouv_organization_or_owner', 'horaires' ] +socket_attributes = { 'prise_type_ef': Socket.EF, 'prise_type_2': Socket.T2, 'prise_type_chademo': Socket.CHADEMO, 'prise_type_combo_ccs': Socket.CCS } +errors = [] +power_stats = [] wrong_ortho = {} -with open('fixes_networks.csv', 'r') as csv_file: - csv_reader = csv.DictReader(csv_file, delimiter=',') - for row in csv_reader: - wrong_ortho[row["opendata_name"]] = row["better_name"] +parser = argparse.ArgumentParser(description='This will group, validate and sanitize a previously "consolidated" export of IRVE data from data.gouv.fr') +parser.add_argument('input', metavar='Input file', nargs='?', default='opendata_irve.csv', + help='csv input file') +parser.add_argument('-s','--power-stats', required=False, default=False, action='store_true', + help='Print power statistics on stdout') def validate_coord(lat_or_lon_text): try: @@ -57,6 +75,75 @@ def cleanPhoneNumber(phone): else: return None +def compute_max_power_per_socket_type(station, errors): + """ + Computes the aggregated max power per socket type accross all PDCs (PDLs) associated with the given station. + Only consider the most powerful socket per PDC. + """ + power_ef = power_t2 = power_chademo = power_ccs = 0 + for pdc in station['pdc_list']: + socket_mask = sum([ flag for socket_attr, flag in socket_attributes.items() if stringBoolToInt(pdc[socket_attr])==1 ]) + socket_mask = Socket(socket_mask) + power = float(pdc['puissance_nominale']) + if power >= 1000: + errors.append({"station_id" : station['attributes']['id_station_itinerance'], + "source": station['attributes']['source_grouped'], + "error": "puissance nominale déclarée suspecte (possible erreur W/kW)", + "detail": "puissance: {}, prises: {}".format(pdc['puissance_nominale'], socket_mask.name) + }) + # Convert from W to kW (>2MW should not exist) + # FIXME: Probably not usefull anymore. Data looks fine. + if power >= 2000: + power /= 1000 + + err_socket = report_socket_power_out_of_specs(power, socket_mask) + if err_socket is not None: + errors.append({"station_id" : station['attributes']['id_station_itinerance'], + "source": station['attributes']['source_grouped'], + "error": "puissance nominale déclarée pour prise {} supérieure à la norme ({})".format(err_socket.name, MAX_POWER_KW[err_socket]), + "detail": "puissance: {}, prises: {}".format(pdc['puissance_nominale'], socket_mask.name) + }) + + max_socket = get_most_powerful_socket(socket_mask) + power_ef = max(power_ef, power if Socket.EF == max_socket and err_socket != max_socket else 0) + power_t2 = max(power_t2, power if Socket.T2 == max_socket and err_socket != max_socket else 0) + power_chademo = max(power_chademo, power if Socket.CHADEMO == max_socket and err_socket != max_socket else 0) + power_ccs = max(power_ccs, power if Socket.CCS == max_socket else 0) + + return (power_ef, power_t2, power_chademo, power_ccs) + +def get_most_powerful_socket(socket_mask): + """ EF < T2 < CHADEMO < CCS + """ + if Socket.CCS in socket_mask: + return Socket.CCS + if Socket.CHADEMO in socket_mask and Socket.CCS in ~socket_mask: + return Socket.CHADEMO + elif Socket.T2 in socket_mask and Socket.CCS | Socket.CHADEMO in ~socket_mask: + return Socket.T2 + elif Socket.EF in socket_mask and Socket.CCS | Socket.CHADEMO | Socket.T2 in ~socket_mask: + return Socket.EF + return None + +def report_socket_power_out_of_specs(power, socket_mask): + """ + This check can only be done on the most powerful socket of the PDC. + Allow rounding errors (max +1 kw). No limits known for CCS. + """ + s = get_most_powerful_socket(socket_mask) + if s is None: return + err_socket = None + if s == Socket.CHADEMO: + if power > MAX_POWER_KW[Socket.CHADEMO] + 1: + err_socket = Socket.CHADEMO + elif s == Socket.T2: + if power > MAX_POWER_KW[Socket.T2] + 1: + err_socket = Socket.T2 + elif s == Socket.EF: + if power > MAX_POWER_KW[Socket.EF] + 1: + err_socket = Socket.EF + return err_socket + def stringBoolToInt(strbool): return 1 if strbool.lower() == 'true' else 0 @@ -73,198 +160,239 @@ def transformRef(refIti, refLoc): else: return None -errors = [] -with open('opendata_irve.csv') as csvfile: - reader = csv.DictReader(csvfile, delimiter=',') - for row in reader: - if not row['id_station_itinerance']: - errors.append({"station_id" : None, - "source": row['datagouv_organization_or_owner'], - "error": "pas d'identifiant ref:EU:EVSE (id_station_itinerance). Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", - "detail": None - }) - continue - if row['id_station_itinerance']=="Non concerné": - # Station non concernée par l'identifiant ref:EU:EVSE (id_station_itinerance). Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose - continue - - cleanRef = transformRef(row['id_station_itinerance'], row['id_station_local']) - - # Overkill given that this data should have passed through this code: - # https://github.com/datagouv/datagouvfr_data_pipelines/blob/75db0b1db3fd79407a1526b0950133114fefaa0f/schema/utils/geo.py#L33 - if not validate_coord(row["consolidated_longitude"]) or not validate_coord(row["consolidated_latitude"]): - errors.append({"station_id" : cleanRef or row['id_station_itinerance'], - "source": row['datagouv_organization_or_owner'], - "error": "coordonnées non valides. Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", - "detail": "consolidated_longitude: {}, consolidated_latitude: {}".format(row['consolidated_longitude'], row["consolidated_latitude"]) - }) - continue - - if not is_correct_id(cleanRef): - errors.append({"station_id" : cleanRef or row['id_station_itinerance'], - "source": row['datagouv_organization_or_owner'], - "error": "le format de l'identifiant ref:EU:EVSE (id_station_itinerance) n'est pas valide. Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", - "detail": "iti: %s, local: %s" % (row['id_station_itinerance'], row['id_station_local'])}) - continue - - if not cleanRef in station_list: - station_prop = {} - for key in station_attributes : - station_prop[key] = row[key] - if row[key] == "null": - station_prop[key] = "" - elif row[key] in wrong_ortho.keys(): - station_prop[key] = wrong_ortho[row[key]] - - station_prop['Xlongitude'] = float(row['consolidated_longitude']) - station_prop['Ylatitude'] = float(row['consolidated_latitude']) - phone = cleanPhoneNumber(row['telephone_operateur']) - station_list[cleanRef] = {'attributes' : station_prop, 'pdc_list': []} - - # Non-blocking issues - if phone is None and row['telephone_operateur']!= "": - station_prop['telephone_operateur'] = None - errors.append({"station_id" : cleanRef, - "source": row['datagouv_organization_or_owner'], - "error": "le numéro de téléphone de l'opérateur (telephone_operateur) est dans un format invalide", - "detail": row['telephone_operateur']}) - elif phone is not None: - station_prop['telephone_operateur'] = phone - else: - station_prop['telephone_operateur'] = None - - if row['station_deux_roues'].lower() not in ['true', 'false', '']: - station_prop['station_deux_roues'] = None - errors.append({"station_id" : cleanRef, - "source": row['datagouv_organization_or_owner'], - "error": "le champ station_deux_roues n'est pas valide", - "detail": row['station_deux_roues']}) - else: - station_prop['station_deux_roues'] = row['station_deux_roues'].lower() - - pdc_prop = {key: row[key] for key in pdc_attributes} - station_list[cleanRef]['pdc_list'].append(pdc_prop) - -# ~ all_prises_types = set() - -for station_id, station in station_list.items() : - station['attributes']['id_station_itinerance'] = station_id - sources = set([elem['datagouv_organization_or_owner'] for elem in station['pdc_list']]) - if len(sources) !=1 : - errors.append({"station_id" : station_id, - "source": "multiples", - "error": "plusieurs sources pour un même id", - "detail": sources - }) - station['attributes']['source_grouped'] = list(sources)[0] - - horaires = set([elem['horaires'].strip() for elem in station['pdc_list']]) - if len(horaires) !=1 : - station['attributes']['horaires_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs horaires pour une même station", - "detail": horaires}) - else : - station['attributes']['horaires_grouped'] = list(horaires)[0] - - gratuit = set([elem['gratuit'].strip().lower() for elem in station['pdc_list']]) - if len(gratuit) !=1 : - station['attributes']['gratuit_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs infos de gratuité (gratuit) pour une même station", - "detail": gratuit}) - else : - station['attributes']['gratuit_grouped'] = list(gratuit)[0] - - paiement_acte = set([elem['paiement_acte'].strip().lower() for elem in station['pdc_list']]) - if len(paiement_acte) !=1 : - station['attributes']['paiement_acte_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs infos de paiement (paiement_acte) pour une même station", - "detail": paiement_acte}) - else : - station['attributes']['paiement_acte_grouped'] = list(paiement_acte)[0] - - paiement_cb = set([elem['paiement_cb'].strip().lower() for elem in station['pdc_list']]) - if len(paiement_cb) !=1 : - station['attributes']['paiement_cb_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs infos de paiement (paiement_cb) pour une même station", - "detail": paiement_cb}) - else : - station['attributes']['paiement_cb_grouped'] = list(paiement_cb)[0] - - reservation = set([elem['reservation'].strip().lower() for elem in station['pdc_list']]) - if len(reservation) !=1 : - station['attributes']['reservation_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs infos de réservation pour une même station", - "detail": reservation}) - else : - station['attributes']['reservation_grouped'] = list(reservation)[0] - - accessibilite_pmr = set([elem['accessibilite_pmr'].strip() for elem in station['pdc_list']]) - if len(accessibilite_pmr) !=1 : - station['attributes']['accessibilite_pmr_grouped'] = None - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "plusieurs infos d'accessibilité PMR (accessibilite_pmr) pour une même station", - "detail": accessibilite_pmr}) - else : - station['attributes']['accessibilite_pmr_grouped'] = list(accessibilite_pmr)[0] - - if len(station['pdc_list']) != int(station['attributes']['nbre_pdc']): - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "le nombre de point de charge de la station n'est pas cohérent avec la liste des points de charge fournie", - "detail": "{} points de charge indiqués pour la station (nbre_pdc) mais {} points de charge listés".format(station['attributes']['nbre_pdc'], len(station['pdc_list']))}) - station['attributes']['nbre_pdc'] = None - - station['attributes']['nb_prises_grouped'] = len(station['pdc_list']) - - EF_count = sum([ stringBoolToInt(elem['prise_type_ef']) for elem in station['pdc_list'] ]) - station['attributes']['nb_EF_grouped'] = EF_count - - T2_count = sum([ stringBoolToInt(elem['prise_type_2']) for elem in station['pdc_list'] ]) - station['attributes']['nb_T2_grouped'] = T2_count - - combo_count = sum([ stringBoolToInt(elem['prise_type_combo_ccs']) for elem in station['pdc_list'] ]) - station['attributes']['nb_combo_ccs_grouped'] = combo_count - - chademo_count = sum([ stringBoolToInt(elem['prise_type_chademo']) for elem in station['pdc_list'] ]) - station['attributes']['nb_chademo_grouped'] = chademo_count - - autre_count = sum([ stringBoolToInt(elem['prise_type_autre']) for elem in station['pdc_list'] ]) - station['attributes']['nb_autre_grouped'] = autre_count - - if (EF_count + T2_count + combo_count + chademo_count + autre_count) == 0: - errors.append({"station_id" : station_id, - "source": station['attributes']['source_grouped'], - "error": "aucun type de prise précisé sur l'ensemble des points de charge", - "detail": "nb pdc: %s" % (len(station['pdc_list'])) - }) - -print("{} stations".format(len(station_list))) - -print("{} points de charge avec des erreurs :".format(len(errors))) -for error_type, error_count in Counter([elem['error'] for elem in errors]).items(): - print(" > {} : {} éléments".format(error_type, error_count)) - - -with open("output/opendata_errors.csv", 'w') as ofile: - tt = csv.DictWriter(ofile, fieldnames=errors[0].keys()) - tt.writeheader() - for elem in errors: - tt.writerow(elem) - -with open("output/opendata_stations.csv", 'w') as ofile: - tt = csv.DictWriter(ofile, fieldnames=station_list[list(station_list)[0]]["attributes"].keys()) - tt.writeheader() - for station_id, station in station_list.items(): - tt.writerow(station['attributes']) +if __name__ == "__main__": + args = parser.parse_args() + + with open('fixes_networks.csv', 'r') as csv_file: + csv_reader = csv.DictReader(csv_file, delimiter=',') + for row in csv_reader: + wrong_ortho[row["opendata_name"]] = row["better_name"] + + with open(args.input) as csvfile: + reader = csv.DictReader(csvfile, delimiter=',') + for row in reader: + if not row['id_station_itinerance']: + errors.append({"station_id" : None, + "source": row['datagouv_organization_or_owner'], + "error": "pas d'identifiant ref:EU:EVSE (id_station_itinerance). Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", + "detail": None + }) + continue + if row['id_station_itinerance']=="Non concerné": + # Station non concernée par l'identifiant ref:EU:EVSE (id_station_itinerance). Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose + continue + + cleanRef = transformRef(row['id_station_itinerance'], row['id_station_local']) + + # Overkill given that this data should have passed through this code: + # https://github.com/datagouv/datagouvfr_data_pipelines/blob/75db0b1db3fd79407a1526b0950133114fefaa0f/schema/utils/geo.py#L33 + if not validate_coord(row["consolidated_longitude"]) or not validate_coord(row["consolidated_latitude"]): + errors.append({"station_id" : cleanRef or row['id_station_itinerance'], + "source": row['datagouv_organization_or_owner'], + "error": "coordonnées non valides. Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", + "detail": "consolidated_longitude: {}, consolidated_latitude: {}".format(row['consolidated_longitude'], row["consolidated_latitude"]) + }) + continue + + if not is_correct_id(cleanRef): + errors.append({"station_id" : cleanRef or row['id_station_itinerance'], + "source": row['datagouv_organization_or_owner'], + "error": "le format de l'identifiant ref:EU:EVSE (id_station_itinerance) n'est pas valide. Ce point de charge est ignoré et sa station ne sera pas présente dans l'analyse Osmose", + "detail": "iti: %s, local: %s" % (row['id_station_itinerance'], row['id_station_local'])}) + continue + + if not cleanRef in station_list: + station_prop = {} + for key in station_attributes : + station_prop[key] = row[key] + if row[key] == "null": + station_prop[key] = "" + elif row[key] in wrong_ortho.keys(): + station_prop[key] = wrong_ortho[row[key]] + + station_prop['Xlongitude'] = float(row['consolidated_longitude']) + station_prop['Ylatitude'] = float(row['consolidated_latitude']) + phone = cleanPhoneNumber(row['telephone_operateur']) + station_list[cleanRef] = {'attributes' : station_prop, 'pdc_list': []} + + # Non-blocking issues + if phone is None and row['telephone_operateur']!= "": + station_prop['telephone_operateur'] = None + errors.append({"station_id" : cleanRef, + "source": row['datagouv_organization_or_owner'], + "error": "le numéro de téléphone de l'opérateur (telephone_operateur) est dans un format invalide", + "detail": row['telephone_operateur']}) + elif phone is not None: + station_prop['telephone_operateur'] = phone + else: + station_prop['telephone_operateur'] = None + + if row['station_deux_roues'].lower() not in ['true', 'false', '']: + station_prop['station_deux_roues'] = None + errors.append({"station_id" : cleanRef, + "source": row['datagouv_organization_or_owner'], + "error": "le champ station_deux_roues n'est pas valide", + "detail": row['station_deux_roues']}) + else: + station_prop['station_deux_roues'] = row['station_deux_roues'].lower() + + pdc_prop = {key: row[key] for key in pdc_attributes} + station_list[cleanRef]['pdc_list'].append(pdc_prop) + + # ~ all_prises_types = set() + + for station_id, station in station_list.items() : + station['attributes']['id_station_itinerance'] = station_id + sources = set([elem['datagouv_organization_or_owner'] for elem in station['pdc_list']]) + if len(sources) !=1 : + errors.append({"station_id" : station_id, + "source": "multiples", + "error": "plusieurs sources pour un même id", + "detail": sources + }) + station['attributes']['source_grouped'] = list(sources)[0] + + horaires = set([elem['horaires'].strip() for elem in station['pdc_list']]) + if len(horaires) !=1 : + station['attributes']['horaires_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs horaires pour une même station", + "detail": horaires}) + else : + station['attributes']['horaires_grouped'] = list(horaires)[0] + + gratuit = set([elem['gratuit'].strip().lower() for elem in station['pdc_list']]) + if len(gratuit) !=1 : + station['attributes']['gratuit_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs infos de gratuité (gratuit) pour une même station", + "detail": gratuit}) + else : + station['attributes']['gratuit_grouped'] = list(gratuit)[0] + + paiement_acte = set([elem['paiement_acte'].strip().lower() for elem in station['pdc_list']]) + if len(paiement_acte) !=1 : + station['attributes']['paiement_acte_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs infos de paiement (paiement_acte) pour une même station", + "detail": paiement_acte}) + else : + station['attributes']['paiement_acte_grouped'] = list(paiement_acte)[0] + + paiement_cb = set([elem['paiement_cb'].strip().lower() for elem in station['pdc_list']]) + if len(paiement_cb) !=1 : + station['attributes']['paiement_cb_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs infos de paiement (paiement_cb) pour une même station", + "detail": paiement_cb}) + else : + station['attributes']['paiement_cb_grouped'] = list(paiement_cb)[0] + + reservation = set([elem['reservation'].strip().lower() for elem in station['pdc_list']]) + if len(reservation) !=1 : + station['attributes']['reservation_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs infos de réservation pour une même station", + "detail": reservation}) + else : + station['attributes']['reservation_grouped'] = list(reservation)[0] + + accessibilite_pmr = set([elem['accessibilite_pmr'].strip() for elem in station['pdc_list']]) + if len(accessibilite_pmr) !=1 : + station['attributes']['accessibilite_pmr_grouped'] = None + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "plusieurs infos d'accessibilité PMR (accessibilite_pmr) pour une même station", + "detail": accessibilite_pmr}) + else : + station['attributes']['accessibilite_pmr_grouped'] = list(accessibilite_pmr)[0] + + if len(station['pdc_list']) != int(station['attributes']['nbre_pdc']): + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "le nombre de point de charge de la station n'est pas cohérent avec la liste des points de charge fournie", + "detail": "{} points de charge indiqués pour la station (nbre_pdc) mais {} points de charge listés".format(station['attributes']['nbre_pdc'], len(station['pdc_list']))}) + station['attributes']['nbre_pdc'] = None + + station['attributes']['nb_prises_grouped'] = len(station['pdc_list']) + + EF_count = sum([ stringBoolToInt(elem['prise_type_ef']) for elem in station['pdc_list'] ]) + station['attributes']['nb_EF_grouped'] = EF_count + + T2_count = sum([ stringBoolToInt(elem['prise_type_2']) for elem in station['pdc_list'] ]) + station['attributes']['nb_T2_grouped'] = T2_count + + combo_count = sum([ stringBoolToInt(elem['prise_type_combo_ccs']) for elem in station['pdc_list'] ]) + station['attributes']['nb_combo_ccs_grouped'] = combo_count + + chademo_count = sum([ stringBoolToInt(elem['prise_type_chademo']) for elem in station['pdc_list'] ]) + station['attributes']['nb_chademo_grouped'] = chademo_count + + autre_count = sum([ stringBoolToInt(elem['prise_type_autre']) for elem in station['pdc_list'] ]) + station['attributes']['nb_autre_grouped'] = autre_count + + if (EF_count + T2_count + combo_count + chademo_count + autre_count) == 0: + errors.append({"station_id" : station_id, + "source": station['attributes']['source_grouped'], + "error": "aucun type de prise précisé sur l'ensemble des points de charge", + "detail": "nb pdc: %s" % (len(station['pdc_list'])) + }) + + power_grouped_values = compute_max_power_per_socket_type(station, errors) + power_stats.append(power_grouped_values) + power_props = ['power_ef_grouped', 'power_t2_grouped', 'power_chademo_grouped', 'power_ccs_grouped'] + station['attributes'].update(zip(power_props, power_grouped_values)) + + if args.power_stats: + print("Computed power stats:") + print(" EF | T2 | Chademo | CCS |") + for power_set, count in Counter(power_stats).most_common(): + print(" > {:4.2f} {:5.2f} {:5.2f} {:6.2f} : {} occurences".format(*power_set, count)) + + print("{} stations".format(len(station_list))) + + print("{} points de charge avec des erreurs :".format(len(errors))) + for error_type, error_count in Counter([elem['error'] for elem in errors]).items(): + print(" > {} : {} éléments".format(error_type, error_count)) + + with open("output/opendata_errors.csv", 'w') as ofile: + tt = csv.DictWriter(ofile, fieldnames=errors[0].keys()) + tt.writeheader() + for elem in errors: + tt.writerow(elem) + + with open("output/opendata_stations.csv", 'w') as ofile: + tt = csv.DictWriter(ofile, fieldnames=station_list[list(station_list)[0]]["attributes"].keys()) + tt.writeheader() + for station_id, station in station_list.items(): + tt.writerow(station['attributes']) + +########################################################################### +import unittest + +class Test(unittest.TestCase): + import tests.power_test_data as data + + def test_get_most_powerfull_socket(self): + self.assertEqual(Socket.CCS, get_most_powerful_socket(Socket.CCS)) + self.assertEqual(Socket.CCS, get_most_powerful_socket(Socket.CHADEMO|Socket.CCS)) + self.assertEqual(Socket.CCS, get_most_powerful_socket(Socket.T2|Socket.CHADEMO|Socket.CCS)) + self.assertEqual(Socket.CCS, get_most_powerful_socket(Socket.EF|Socket.T2|Socket.CHADEMO|Socket.CCS)) + self.assertEqual(Socket.CHADEMO, get_most_powerful_socket(Socket.CHADEMO)) + self.assertEqual(Socket.CHADEMO, get_most_powerful_socket(Socket.T2|Socket.CHADEMO)) + self.assertEqual(Socket.CHADEMO, get_most_powerful_socket(Socket.EF|Socket.T2|Socket.CHADEMO)) + self.assertEqual(Socket.T2, get_most_powerful_socket(Socket.T2)) + self.assertEqual(Socket.T2, get_most_powerful_socket(Socket.EF|Socket.T2)) + self.assertEqual(Socket.EF, get_most_powerful_socket(Socket.EF)) + + def test_compute_max_power_per_socket_type(self): + for test in self.data.stations: + errors = [] + self.assertEqual(test["result"], compute_max_power_per_socket_type(test["station"], errors)) + self.assertEqual(test["errors"], errors) \ No newline at end of file diff --git a/tests/power_test_data.py b/tests/power_test_data.py new file mode 100644 index 0000000..2dfd14c --- /dev/null +++ b/tests/power_test_data.py @@ -0,0 +1,131 @@ +stations = [{ + "station": { + "attributes": { + "id_station_itinerance": "id1", + "source_grouped": "sc1" + }, + "pdc_list": [ + { + "puissance_nominale": 100, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "True", + 'prise_type_combo_ccs': "True", + }, + { + "puissance_nominale": 120, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "True", + 'prise_type_combo_ccs': "TRUE", + }, + { + "puissance_nominale": 80, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "true", + 'prise_type_combo_ccs': "false", + }, + ] + }, + "result": (0, 0, 0, 120.0), # EF, T2, Chademo, CCS + "errors": [{ + 'station_id': 'id1', + 'source': 'sc1', + 'error': 'puissance nominale déclarée pour prise CHADEMO supérieure à la norme (63)', + 'detail': 'puissance: 80, prises: EF|T2|CHADEMO' + }] +},{ + "station": { + "attributes": { + "id_station_itinerance": "id1", + "source_grouped": "sc1" + }, + "pdc_list": [ + { + "puissance_nominale": 22, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "not a bool", + 'prise_type_combo_ccs': "False", + }, + { + "puissance_nominale": 56, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "True", + 'prise_type_combo_ccs': "FALSE", + }, + { + "puissance_nominale": 4, + 'prise_type_ef': "True", + 'prise_type_2': "false", + 'prise_type_chademo': "false", + 'prise_type_combo_ccs': "false", + }, + ] + }, + "result": (4.0, 22.0, 56.0, 0), # EF, T2, Chademo, CCS + "errors": [] +},{ + "station": { + "attributes": { + "id_station_itinerance": "id1", + "source_grouped": "sc1" + }, + "pdc_list": [ + { + "puissance_nominale": 3600, + 'prise_type_ef': "True", + 'prise_type_2': "false", + 'prise_type_chademo': "not a bool", + 'prise_type_combo_ccs': "False", + }, + { + "puissance_nominale": 80, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "True", + 'prise_type_combo_ccs': "FALSE", + }, + { + "puissance_nominale": 50, + 'prise_type_ef': "True", + 'prise_type_2': "false", + 'prise_type_chademo': "TRUE", + 'prise_type_combo_ccs': "false", + }, + ] + }, + "result": (3.6, 0, 50.0, 0), # EF, T2, Chademo, CCS + "errors": [{ + 'station_id': 'id1', + 'source': 'sc1', + 'error': 'puissance nominale déclarée suspecte (possible erreur W/kW)', + 'detail': 'puissance: 3600, prises: EF' + },{ + 'station_id': 'id1', + 'source': 'sc1', + 'error': 'puissance nominale déclarée pour prise CHADEMO supérieure à la norme (63)', + 'detail': 'puissance: 80, prises: EF|T2|CHADEMO' + }] +},{ + "station": { + "attributes": { + "id_station_itinerance": "id1", + "source_grouped": "sc1" + }, + "pdc_list": [ + { + "puissance_nominale": 0, + 'prise_type_ef': "True", + 'prise_type_2': "True", + 'prise_type_chademo': "not a bool", + 'prise_type_combo_ccs': "False", + }, + ] + }, + "result": (0, 0, 0, 0), + "errors": [] +} +] \ No newline at end of file