Skip to content

Commit

Permalink
feat(storage): Change storage from Parquet to DuckDB
Browse files Browse the repository at this point in the history
Signed-off-by: Arnau Siches <[email protected]>
  • Loading branch information
arnau committed Sep 29, 2024
1 parent 10585b2 commit 3b29404
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
data/config.nuon
data/*/*.parquet
.dial.nu
data/dial.db
23 changes: 15 additions & 8 deletions dial/duckdb.nu
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@
export def run [
command: string # The query to run.
filename: string = "" # The filename of the DuckDB database to use.
--bail # Stop after hitting an error.
] {
let flags = [
{flag: "-bail", value: $bail}
]

let options = $flags | where value == true | get flag | str join ' '

^duckdb $options -jsonlines -c $command $filename
^duckdb -jsonlines -c $command $filename
| lines
| each { from json }
}


# Opens a file or set of files based on the file extension.
#
# Note that Duckdb is able to open CSV, Parquet and JSON by default.
Expand Down Expand Up @@ -64,3 +58,16 @@ export def save [
| to json
| ^duckdb -c $"copy \(select * from read_json\('/dev/stdin'\)\) to '($filename)' \(format '($format)'\)"
}

# Attempts to insert the given table into the DuckDB table replacing when the primary key already exists.
export def upsert [
table_name: string # Target DuckDB table
filename: string # A valid DuckDB database.
]: [table -> table] {
let from_stdin = "(select * from read_json('/dev/stdin'))"

$in
| to json
| ^duckdb -c $"insert or replace into \"($table_name)\" by name ($from_stdin)" $filename
}

7 changes: 4 additions & 3 deletions dial/jira.nu
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,11 @@ export def "changelog flatten" []: table -> table {
| flatten --all
| where field == "status"
| reject fieldId fieldtype from to field
| rename --column {created: timestamp, fromString: start, toString: end, author: actor}
| rename --column {created: timestamp, fromString: start_status, toString: end_status, author: actor}
| insert source "jira"
| update start { normalise-status }
| update end { normalise-status }
| update start_status { normalise-status }
| update end_status { normalise-status }
| select id key source timestamp actor start_status end_status
}


Expand Down
1 change: 1 addition & 0 deletions dial/mod.nu
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# export use token.nu
# export use http.nu
export use config.nu *
export use storage.nu
export use github.nu
export use jira.nu
export use source.nu
Expand Down
14 changes: 7 additions & 7 deletions dial/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ create table if not exists ticket (

-- Ticket status transition. E.g. change from 'in-progress' to 'done'.
create table if not exists ticket_status (
id integer not null
, key text not null
, source text not null
, timestamp datetime not null
, actor text not null
, start text not null
, end text not null
id integer not null
, key text not null
, source text not null
, timestamp datetime not null
, actor text not null
, start_status text not null
, end_status text not null

, primary key (id, key, source)
);
67 changes: 39 additions & 28 deletions dial/source.nu
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use config.nu *
use github.nu
use jira.nu
use duckdb.nu
use storage.nu

const allowlist_events = [
assigned
Expand All @@ -26,8 +27,7 @@ const allowlist_events = [
export def "changeset fetch" [start_date: datetime, end_date: datetime, team: string@"team list names"] {
let orgs = team orgs $team
let start_date = ($start_date | format date "%F")
let end_date = ($start_date | format date "%F")
let filename = $"data/changeset/($orgs | str join ".").($start_date).($end_date).parquet"
let end_date = ($end_date | format date "%F")

let res = github pr list merged -s $start_date {org: $orgs}

Expand All @@ -40,12 +40,14 @@ export def "changeset fetch" [start_date: datetime, end_date: datetime, team: st
$res
| get data.items
| github pr list normalise
| tee { if ($in | is-not-empty) { $in | duckdb save -f $filename } }
| tee {
if ($in | is-not-empty) { $in | storage save changeset }
}
| do {
let items = $in

{
filename: $filename
table: changeset
count: ($items | length)
start_date: $start_date
end_date: $end_date
Expand All @@ -56,10 +58,10 @@ export def "changeset fetch" [start_date: datetime, end_date: datetime, team: st
# Fetches the timelines for any changeset stored for the given date range.
# The result is persisted in `changeset_timeline`.
export def "changeset timeline fetch" [start_date: datetime, end_date: datetime] {
duckdb open data/changeset/*.parquet
| update resolution_date { into datetime }
| where resolution_date >= $start_date
| where resolution_date <= $end_date
let start_date = $start_date | format date "%F"
let end_date = $end_date | format date "%F"

storage query $"select * from changeset where resolution_date >= date '($start_date)' and resolution_date <= date '($end_date)'"
| par-each {|row|
$row.timeline_url
| github pr timeline-url
Expand All @@ -68,6 +70,7 @@ export def "changeset timeline fetch" [start_date: datetime, end_date: datetime]
| reject id?
| insert changeset_id $row.id
| insert repository $row.repository
| upsert source github
}
| flatten
| collect
Expand All @@ -87,18 +90,17 @@ export def "changeset timeline fetch" [start_date: datetime, end_date: datetime]
}
}
| rename --column {node_id: id}
| select id changeset_id repository timestamp event actor url?
| select id changeset_id repository timestamp event actor url? source
| tee { if ($in | is-not-empty) { $in | storage save changeset_event } }
| do {
if ($in | is-not-empty) {
let groups = $in | group-by --to-table repository
| insert max {|row| $row.items.timestamp | math max }

$groups
| insert repo {|row| $row.group | str replace "/" "+" }
| each {|row|
$row.items | duckdb save -f $"data/changeset_timeline/($row.repo).($start_date | format date "%F").($row.max | format date "%F").parquet"
}
}
let items = $in

{
table: changeset_event
count: ($items | length)
start_date: $start_date
end_date: $end_date
}
}
}

Expand All @@ -110,9 +112,6 @@ export def "changeset timeline fetch" [start_date: datetime, end_date: datetime]
# ```
export def "ticket fetch" [start_date: datetime, end_date: datetime, team: string@"team list names"] {
let emails = team members $team | get email
let start_date_s = $start_date | format date "%F"
let end_date_s = $end_date | format date "%F"
let filename = $"data/ticket/($team).($start_date_s).($end_date_s).parquet"

let res = jira ticket fetch $start_date $end_date $emails

Expand All @@ -121,12 +120,12 @@ export def "ticket fetch" [start_date: datetime, end_date: datetime, team: strin
| get data?
| flatten
| jira ticket flatten
| tee { if ($in | is-not-empty) { $in | duckdb save -f $filename } }
| tee { if ($in | is-not-empty) { $in | storage save ticket } }
| do {
let items = $in

{
filename: $filename
table: ticket
count: ($items | length)
start_date: $start_date
end_date: $end_date
Expand All @@ -137,9 +136,10 @@ export def "ticket fetch" [start_date: datetime, end_date: datetime, team: strin


export def "ticket timeline fetch" [start_date: datetime, end_date: datetime] {
duckdb open data/ticket/*.parquet
| update resolution_date { into datetime }
| where resolution_date >= $start_date and resolution_date <= $end_date
let start_date = $start_date | format date "%F"
let end_date = $end_date | format date "%F"

storage query $"select * from ticket where resolution_date >= date '($start_date)' and resolution_date <= date '($end_date)'"
| par-each {|row|
let res = jira changelog fetch $row.key

Expand All @@ -148,9 +148,20 @@ export def "ticket timeline fetch" [start_date: datetime, end_date: datetime] {
| get data?
| flatten
| jira changelog flatten
| duckdb save -f $"data/ticket_timeline/($row.key).parquet"
} else {
fail $"Failed to fetch ($row.key)"
}
}
| flatten
| tee { if ($in | is-not-empty) { $in | storage save ticket_status } }
| do {
let items = $in

{
table: ticket_status
count: ($items | length)
start_date: $start_date
end_date: $end_date
}
}
}
48 changes: 48 additions & 0 deletions dial/storage.nu
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use error.nu *
use duckdb.nu

def "db run" [sql: string] {
let location = try-env DIAL_DB

duckdb run $sql $location
}


# Initialises a new storage at the location indicated by the `DIAL_DB`.
export def --env init [] {
let location = if ($env.DIAL_DB? | is-empty) { "data/dial.db" } else { $env.DIAL_DB? }

if ($location | path exists) { return }

# TODO: Assumes context.
db run (open dial/schema.sql)
}

def tables [] {
let location = if ($env.DIAL_DB? | is-empty) { "data/dial.db" } else { $env.DIAL_DB? }

duckdb run "select table_name from information_schema.tables where table_schema = 'main' and table_type = 'BASE TABLE';" $location
| get table_name
}

# Attempts to save the given data into the Dial storage.
export def save [table_name: string@"tables"] {
let location = if ($env.DIAL_DB? | is-empty) { "data/dial.db" } else { $env.DIAL_DB? }

if not ($location | path exists) {
fail $"The database in ($location) does not exist. Please run `dial storage init`"
}

$in
| duckdb upsert $table_name $location
}

export def query [sql: string] {
let location = if ($env.DIAL_DB? | is-empty) { "data/dial.db" } else { $env.DIAL_DB? }

if not ($location | path exists) {
fail $"The database in ($location) does not exist. Please run `dial storage init`"
}

duckdb run $sql $location
}

0 comments on commit 3b29404

Please sign in to comment.