-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathschema.go
141 lines (126 loc) · 3.13 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package zenodb
import (
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"github.com/getlantern/yaml"
"github.com/getlantern/zenodb/sql"
)
type Schema map[string]*TableOpts
func (db *DB) pollForSchema(filename string) error {
stat, err := os.Stat(filename)
if err != nil {
return err
}
err = db.ApplySchemaFromFile(filename)
if err != nil {
db.log.Error(err)
return err
}
db.Go(func(stop <-chan interface{}) {
db.log.Debug("Polling for schema changes")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
newStat, err := os.Stat(filename)
if err != nil {
db.log.Errorf("Unable to stat schema: %v", err)
} else if newStat.ModTime().After(stat.ModTime()) || newStat.Size() != stat.Size() {
db.log.Debug("Schema file changed, applying")
applyErr := db.ApplySchemaFromFile(filename)
if applyErr != nil {
db.log.Error(applyErr)
}
stat = newStat
}
}
}
})
return nil
}
func (db *DB) ApplySchemaFromFile(filename string) error {
b, err := ioutil.ReadFile(filename)
if err != nil {
return err
}
var schema Schema
err = yaml.Unmarshal(b, &schema)
if err != nil {
db.log.Errorf("Error applying schema: %v", err)
db.log.Debug(string(b))
return err
}
return db.ApplySchema(schema)
}
func (db *DB) ApplySchema(_schema Schema) error {
schema := make(Schema, len(_schema))
// Convert all names in schema to lowercase
for name, opts := range _schema {
opts.Name = strings.ToLower(name)
schema[opts.Name] = opts
}
// Identify dependencies
var tables []*TableOpts
for name, opts := range schema {
if !opts.View {
tables = append(tables, opts)
} else {
dependsOn, err := sql.TableFor(opts.SQL)
if err != nil {
return fmt.Errorf("Unable to determine underlying table for view %v: %v", name, err)
}
table, found := schema[dependsOn]
if !found {
return fmt.Errorf("Table %v needed by view %v not found", name, dependsOn)
}
table.dependencyOf = append(table.dependencyOf, opts)
}
}
// Apply tables in order of dependencies
bd := &byDependency{}
for _, opts := range tables {
bd.add(opts)
}
db.log.Debugf("Applying tables in order: %v", strings.Join(bd.names, ", "))
for _, opts := range bd.opts {
name := opts.Name
t := db.getTable(name)
tableType := "table"
if opts.View {
tableType = "view"
}
if t == nil {
db.log.Debugf("Creating %v '%v' as\n%v", tableType, name, opts.SQL)
db.log.Debugf("MaxFlushLatency: %v MinFlushLatency: %v", opts.MaxFlushLatency, opts.MinFlushLatency)
err := db.CreateTable(opts)
if err != nil {
return fmt.Errorf("Error creating table %v: %v", name, err)
}
db.log.Debugf("Created %v %v", tableType, name)
} else {
db.log.Debugf("Altering %v '%v' as \n%v", tableType, name, opts.SQL)
err := t.Alter(opts)
if err != nil {
return err
}
}
}
return nil
}
type byDependency struct {
opts []*TableOpts
names []string
}
func (bd *byDependency) add(opts *TableOpts) {
bd.opts = append(bd.opts, opts)
bd.names = append(bd.names, opts.Name)
for _, dep := range opts.dependencyOf {
bd.add(dep)
}
}