Skip to content

Commit

Permalink
Travaux de qualification des flux IRVE dynamiques (#3816)
Browse files Browse the repository at this point in the history
* Adapt setup to rely on now-incorporated app resources

* Leverage now existing HTTPClient

* Remove legacy

* Adapt remaining code to fix the script

* Add io_ansi_table via hybrid mix setup

* Improve reporting

* Improve reporting

* Add TODOs

* Avoid parsing CSV multiple times

* Show validity information

* Add local frictionless validator

* Help me understand

* Fix cache code

* Disable cache

* Add notes on frictionless

See:
- frictionlessdata/frictionless-py#1646

* Start parsing report

* Move to some non-git-enabled folder

* Shorten name

* Dump frictionless output (not always reliable)
  • Loading branch information
thbar authored Mar 18, 2024
1 parent b273a49 commit 61eabf1
Showing 1 changed file with 177 additions and 38 deletions.
215 changes: 177 additions & 38 deletions scripts/irve/dynamic-irve.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
Mix.install([
{:req, "~> 0.3.9"},
{:jason, "~> 1.4"},
{:csv, "~> 3.0"}
])

Code.require_file(__DIR__ <> "/req_custom_cache.exs")
my_app_root = Path.join(__DIR__, "../..")

# hybrid setup to rely on the whole app setup but increment with a specificy dependency
Mix.install(
[
{:my_app, path: my_app_root, env: :dev},
{:io_ansi_table, "~> 1.0"}
],
config_path: Path.join(my_app_root, "config/config.exs"),
lockfile: Path.join(my_app_root, "mix.lock")
)

params = %{
page: 1,
Expand All @@ -17,13 +21,25 @@ url = "https://www.data.gouv.fr/api/1/datasets/?#{URI.encode_query(params)}"
defmodule Query do
def cache_dir, do: Path.join(__ENV__.file, "../cache-dir") |> Path.expand()

def cached_get!(url) do
req = Req.new() |> CustomCache.attach()
Req.get!(req, url: url, custom_cache_dir: cache_dir())
def cached_get!(url, options \\ []) do
options = [
decode_body: options |> Keyword.get(:decode_body, true),
enable_cache: options |> Keyword.get(:enable_cache, false)
]

options =
if options[:enable_cache] do
Keyword.merge(options, custom_cache_dir: cache_dir())
else
options
end

Transport.HTTPClient.get!(url, options)
end
end

%{status: 200, body: datasets} = Query.cached_get!(url)
# disabling cache because one dataset is refreshed very frequently, caching leads to 404
%{status: 200, body: datasets} = Query.cached_get!(url, enable_cache: false)

# ensure there is only one page + grab the data
unless is_nil(datasets["next_page"]), do: raise("should not have next page")
Expand All @@ -36,7 +52,13 @@ resources =
dataset["resources"]
|> Enum.filter(fn r -> r["schema"]["name"] == "etalab/schema-irve-dynamique" end)
|> Enum.map(fn r ->
Map.put(r, "dataset_url", dataset["page"])
r
|> Map.put("dataset_url", dataset["page"])
|> Map.put("organization", dataset["organization"]["name"])
|> Map.put("valid", get_in(r, ["extras", "validation-report:valid_resource"]))
|> Map.put("validation_date", get_in(r, ["extras", "validation-report:validation_date"]))
|> Map.put("schema_name", get_in(r, ["schema", "name"]))
|> Map.put("schema_version", get_in(r, ["schema", "version"]))
end)
end)
|> Enum.reject(fn r ->
Expand All @@ -45,28 +67,34 @@ resources =
r["id"] == "5ef6ddff-2f98-4300-9e6e-1b47ea4ab779"
end)

# IO.inspect resources |> Enum.map &(&1["url"])

defmodule IRVECheck do
def is_dynamic_irve?(url) do
%{status: 200, body: body} = Query.cached_get!(url)
def get_body(url) do
# control the decoding ourselves ; by default Req would decode via CSV itself
%{status: 200, body: body} = Query.cached_get!(url, decode_body: false)
body
end

# quick first decode to get the headers, even if the file has no rows
data =
[body]
|> CSV.decode!(headers: false)
|> Enum.take(1)
|> List.first()
def parse_csv(body) do
[body]
|> CSV.decode!(headers: true)
|> Enum.to_list()
end

"id_pdc_itinerance" in data && "etat_pdc" in data
def get_headers(body) do
[body]
# quick first decode to get the headers, even if the file has no rows
|> CSV.decode!(headers: false)
|> Enum.take(1)
|> List.first()
end

def time_window(url) do
%{status: 200, body: body} = Query.cached_get!(url)
def is_dynamic_irve?(headers) do
"id_pdc_itinerance" in headers && "etat_pdc" in headers
end

def time_window(rows) do
data =
[body]
|> CSV.decode!(headers: true)
rows
|> Enum.map(fn x -> (x["horodatage"] || "???") |> String.slice(0, 10) end)
|> Enum.to_list()
|> Enum.sort()
Expand All @@ -75,14 +103,125 @@ defmodule IRVECheck do
end
end

resources
|> Enum.each(fn r ->
IO.puts("\n" <> r["dataset_url"])

IO.puts(
r["url"] <>
" --- " <>
if(IRVECheck.is_dynamic_irve?(r["url"]), do: "OK", else: "KO") <>
" " <> (IRVECheck.time_window(r["url"]) |> inspect)
)
end)
# very brittle (false positives & false negatives) at the moment, but helped me a bit already
# Waiting for feedback on https://github.com/frictionlessdata/frictionless-py/issues/1646
defmodule FrictionlessValidator do
@latest_dynamic_irve_schema "https://schema.data.gouv.fr/schemas/etalab/schema-irve-dynamique/latest/schema-dynamique.json"

def validate(file_url, schema \\ @latest_dynamic_irve_schema) do
cmd = "frictionless"
# NOTE: I tried using `--schema-sync` as an attempt to avoid failure
# when an optional field column's header is missing.
args = ["validate", file_url, "--schema", schema, "--json"]
_debug_cmd = [cmd, args] |> List.flatten() |> Enum.join(" ")

{output, result} = System.cmd(cmd, args)

case result do
0 ->
{:ok, Jason.decode!(output)}

1 ->
{:error, Jason.decode!(output)}
end
end

# quick and dirty parsing
def errors_summary(output) do
output["tasks"]
|> Enum.map(& &1["errors"])
|> List.flatten()
|> Enum.map(& &1["message"])
|> Enum.take(5)
end
end

IO.puts("========== #{resources |> length()} candidates ==========\n\n")

rows =
resources
|> Enum.map(fn r ->
body = IRVECheck.get_body(r["url"])
rows = IRVECheck.parse_csv(body)
headers = IRVECheck.get_headers(body)

{local_valid, validation_result} = FrictionlessValidator.validate(r["url"])

File.write!(
"cache-dir/dyn-irve-" <> r["id"],
validation_result |> Jason.encode!() |> Jason.Formatter.pretty_print()
)

%{
dataset_url: r["dataset_url"],
r_id: r["id"],
organization: r["organization"],
resource_url: r["url"],
dyn_irve_likely: IRVECheck.is_dynamic_irve?(headers),
time_window: IRVECheck.time_window(rows),
rows: rows |> length(),
valid: r["valid"],
local_valid: local_valid,
v_date: r["validation_date"],
schema_name: r["schema_name"],
schema_version: r["schema_version"],
local_validation_errors: FrictionlessValidator.errors_summary(validation_result)
}
end)

IO.inspect(rows, IEx.inspect_opts())

IO.ANSI.Table.start(
[
:organization,
:r_id,
:dyn_irve_likely,
:rows,
# :dataset_url,
:valid,
:local_valid,
:v_date,
:schema_name,
:schema_version
],
sort_specs: [desc: :rows],
max_width: :infinity
)

IO.ANSI.Table.format(rows)
IO.ANSI.Table.stop()

IO.ANSI.Table.start(
[
:organization,
:dyn_irve_likely,
:rows,
:dataset_url,
:valid
],
sort_specs: [desc: :rows],
max_width: :infinity
)

IO.ANSI.Table.format(rows)
IO.ANSI.Table.stop()

IO.ANSI.Table.start(
[
:organization,
:rows,
:local_valid,
:local_validation_errors
],
sort_specs: [desc: :rows],
max_width: :infinity
)

exploded_rows =
rows
|> Enum.flat_map(fn r ->
r[:local_validation_errors]
|> Enum.map(fn x -> r |> Map.put(:local_validation_errors, x) end)
end)

IO.ANSI.Table.format(exploded_rows)

0 comments on commit 61eabf1

Please sign in to comment.