Skip to content

Commit cd61a03

Browse files
authored
Add directory checksums to help disambiguate FVs where only file metadata has changed (#17)
* Support SHA1 of all files in directory * Ignore files outside of root directory * Add sha1_dir * Set SHA1Dir on FeedVersion * Use FetchOptions, FetchResult, ImportOptions, ImportResult to handle complexity * Add DirSHA1 to Overlay test adapter * Improve command help
1 parent c3f3265 commit cd61a03

22 files changed

+208
-124
lines changed

cmd/gotransit/copy.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"flag"
5+
"fmt"
56

67
"github.com/interline-io/gotransit"
78
"github.com/interline-io/gotransit/copier"
@@ -17,7 +18,6 @@ type basicCopyOptions struct {
1718
allowReferenceErrors bool
1819
extensions arrayFlags
1920
filters arrayFlags
20-
args []string
2121
}
2222

2323
// copyCommand
@@ -27,21 +27,25 @@ type copyCommand struct {
2727

2828
func (cmd *copyCommand) Run(args []string) error {
2929
fl := flag.NewFlagSet("copy", flag.ExitOnError)
30+
fl.Usage = func() {
31+
fmt.Println("Usage: copy <reader> <writer>")
32+
fl.PrintDefaults()
33+
}
3034
fl.Var(&cmd.extensions, "ext", "Include GTFS Extension")
3135
fl.IntVar(&cmd.fvid, "fvid", 0, "Specify FeedVersionID")
3236
fl.BoolVar(&cmd.newfv, "newfv", false, "Create a new FeedVersion from Reader")
3337
fl.BoolVar(&cmd.create, "create", false, "Create")
3438
fl.BoolVar(&cmd.allowEntityErrors, "allow-entity-errors", false, "Allow entity-level errors")
3539
fl.BoolVar(&cmd.allowReferenceErrors, "allow-reference-errors", false, "Allow reference errors")
3640
fl.Parse(args)
37-
cmd.args = fl.Args()
38-
if len(cmd.args) < 2 {
39-
exit("Requires input and output")
41+
if fl.NArg() < 2 {
42+
fl.Usage()
43+
exit("requires input reader and output writer")
4044
}
4145
// Reader / Writer
42-
reader := MustGetReader(cmd.args[0])
46+
reader := MustGetReader(fl.Arg(0))
4347
defer reader.Close()
44-
writer := MustGetWriter(cmd.args[1], cmd.create)
48+
writer := MustGetWriter(fl.Arg(1), cmd.create)
4549
defer writer.Close()
4650
// Setup copier
4751
cp := copier.NewCopier(reader, writer)

cmd/gotransit/extract.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"flag"
5+
"fmt"
56
"strconv"
67
"strings"
78

@@ -33,6 +34,10 @@ type extractCommand struct {
3334

3435
func (cmd *extractCommand) Run(args []string) error {
3536
fl := flag.NewFlagSet("extract", flag.ExitOnError)
37+
fl.Usage = func() {
38+
fmt.Println("Usage: extract <input> <output>")
39+
fl.PrintDefaults()
40+
}
3641
fl.Var(&cmd.extensions, "ext", "Include GTFS Extension")
3742
fl.IntVar(&cmd.fvid, "fvid", 0, "Specify FeedVersionID")
3843
fl.BoolVar(&cmd.newfv, "newfv", false, "Create a new FeedVersion from Reader")
@@ -55,11 +60,14 @@ func (cmd *extractCommand) Run(args []string) error {
5560
fl.Var(&cmd.extractRouteTypes, "extract-route-type", "Extract Routes matching route_type")
5661
fl.Var(&cmd.extractSet, "set", "Set values on output; format is filename,id,key,value")
5762
fl.Parse(args)
58-
cmd.args = fl.Args()
63+
if fl.NArg() < 2 {
64+
fl.Usage()
65+
exit("requires input reader and output writer")
66+
}
5967
// Reader / Writer
60-
reader := MustGetReader(cmd.args[0])
68+
reader := MustGetReader(fl.Arg(0))
6169
defer reader.Close()
62-
writer := MustGetWriter(cmd.args[1], cmd.create)
70+
writer := MustGetWriter(fl.Arg(1), cmd.create)
6371
defer writer.Close()
6472
// Setup copier
6573
cp := copier.NewCopier(reader, writer)

cmd/gotransit/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ func main() {
140140
default:
141141
exit("%q is not valid command.", subc)
142142
}
143-
err = r.Run(args[1:])
143+
err = r.Run(args[1:]) // consume first arg
144144
if err != nil {
145-
exit("Error: %s", err.Error())
145+
exit("error: %s", err.Error())
146146
}
147147
}

cmd/gotransit/validate.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,22 @@ import (
1111
// validateCommand
1212
type validateCommand struct {
1313
validateExtensions arrayFlags
14-
args []string
1514
}
1615

1716
func (cmd *validateCommand) Run(args []string) error {
1817
fl := flag.NewFlagSet("validate", flag.ExitOnError)
18+
fl.Usage = func() {
19+
fmt.Println("Usage: validate <input>")
20+
fl.PrintDefaults()
21+
}
1922
fl.Var(&cmd.validateExtensions, "ext", "Include GTFS Extension")
2023
fl.Parse(args)
21-
cmd.args = fl.Args()
24+
if fl.NArg() < 1 {
25+
fl.Usage()
26+
exit("requires input reader")
27+
}
2228
//
23-
reader := MustGetReader(cmd.args[0])
29+
reader := MustGetReader(fl.Arg(0))
2430
defer reader.Close()
2531
v, err := validator.NewValidator(reader)
2632
if err != nil {

copier/geomcache_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ func Test_geomCache(t *testing.T) {
3333
trip := trips[stoptimes[0].TripID]
3434
stoptimes2, err := cache.InterpolateStopTimes(trip, stoptimes)
3535
if err != nil {
36-
// fmt.Printf("stoptimes: %#v\n", stoptimes)
3736
t.Error(err)
3837
}
3938
if len(stoptimes) != len(stoptimes2) {

dmfr/dmfr_cmd.go

+60-40
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func (cmd *Command) Run(args []string) error {
3030
fmt.Println(" sync")
3131
fmt.Println(" import")
3232
fmt.Println(" fetch")
33+
fl.PrintDefaults()
3334
}
3435
fl.Parse(args)
3536
subc := fl.Arg(0)
@@ -55,7 +56,7 @@ func (cmd *Command) Run(args []string) error {
5556
default:
5657
return fmt.Errorf("Invalid command: %q", subc)
5758
}
58-
return r.Run(fl.Args())
59+
return r.Run(fl.Args()[1:]) // consume first arg
5960
}
6061

6162
/////
@@ -74,17 +75,18 @@ type dmfrImportCommand struct {
7475

7576
func (cmd *dmfrImportCommand) Run(args []string) error {
7677
fl := flag.NewFlagSet("import", flag.ExitOnError)
78+
fl.Usage = func() {
79+
fmt.Println("Usage: import [feedids...]")
80+
fl.PrintDefaults()
81+
}
7782
fl.Var(&cmd.extensions, "ext", "Include GTFS Extension")
7883
fl.IntVar(&cmd.workers, "workers", 1, "Worker threads")
7984
fl.StringVar(&cmd.dburl, "dburl", os.Getenv("DMFR_DATABASE_URL"), "Database URL (default: $DMFR_DATABASE_URL)")
8085
fl.StringVar(&cmd.gtfsdir, "gtfsdir", ".", "GTFS Directory")
8186
fl.StringVar(&cmd.coverdate, "date", "", "Service on date")
8287
fl.Uint64Var(&cmd.limit, "limit", 0, "Import at most n feeds")
8388
fl.BoolVar(&cmd.dryrun, "dryrun", false, "Dry run; print feeds that would be imported and exit")
84-
fl.Usage = func() {
85-
fmt.Println("Usage: import [feedids...]")
86-
}
87-
fl.Parse(args[1:])
89+
fl.Parse(args)
8890
cmd.feedids = fl.Args()
8991
if cmd.adapter == nil {
9092
writer := mustGetWriter(cmd.dburl, true)
@@ -146,33 +148,34 @@ func (cmd *dmfrImportCommand) Run(args []string) error {
146148
return nil
147149
}
148150
var wg sync.WaitGroup
149-
jobs := make(chan int, len(qlookup))
150-
results := make(chan FeedVersionImport, len(qlookup))
151+
jobs := make(chan ImportOptions, len(qlookup))
152+
results := make(chan ImportResult, len(qlookup))
151153
for w := 0; w < cmd.workers; w++ {
152154
wg.Add(1)
153-
go dmfrImportWorker(w, cmd.adapter, cmd.extensions, cmd.gtfsdir, jobs, results, &wg)
155+
go dmfrImportWorker(w, cmd.adapter, jobs, results, &wg)
154156
}
155157
for fvid := range qlookup {
156-
jobs <- fvid
158+
jobs <- ImportOptions{FeedVersionID: fvid, Directory: cmd.gtfsdir, Extensions: cmd.extensions}
157159
}
158160
close(jobs)
159161
wg.Wait()
160162
close(results)
161163
// Read out results
162-
for fviresult := range results {
163-
i := qlookup[fviresult.FeedVersionID]
164-
if fviresult.Success {
165-
log.Info("Feed %s (id:%d): FeedVersion %s (id:%d): success: count: %v errors: %v", i.OnestopID, i.FeedID, i.SHA1, fviresult.FeedVersionID, fviresult.EntityCount, fviresult.ErrorCount)
164+
for result := range results {
165+
fvid := result.FeedVersionImport.FeedVersionID
166+
i := qlookup[fvid]
167+
if result.FeedVersionImport.Success {
168+
log.Info("Feed %s (id:%d): FeedVersion %s (id:%d): success: count: %v errors: %v", i.OnestopID, i.FeedID, i.SHA1, fvid, result.FeedVersionImport.EntityCount, result.FeedVersionImport.ErrorCount)
166169
} else {
167-
log.Info("Feed %s (id:%d): FeedVersion %s (id:%d): error: %s", i.OnestopID, i.FeedID, i.SHA1, i.SHA1, fviresult.FeedVersionID, err.Error())
170+
log.Info("Feed %s (id:%d): FeedVersion %s (id:%d): error: %s", i.OnestopID, i.FeedID, i.SHA1, i.SHA1, result.FeedVersionImport.FeedVersionID, err.Error())
168171
}
169172
}
170173
return nil
171174
}
172175

173-
func dmfrImportWorker(id int, adapter gtdb.Adapter, exts []string, gtfsdir string, jobs <-chan int, results chan<- FeedVersionImport, wg *sync.WaitGroup) {
174-
for fvid := range jobs {
175-
fviresult, err := MainImportFeedVersion(adapter, fvid, exts, gtfsdir)
176+
func dmfrImportWorker(id int, adapter gtdb.Adapter, jobs <-chan ImportOptions, results chan<- ImportResult, wg *sync.WaitGroup) {
177+
for opts := range jobs {
178+
fviresult, err := MainImportFeedVersion(adapter, opts)
176179
if err != nil {
177180
log.Info("Error: %s", err.Error())
178181
}
@@ -184,24 +187,27 @@ func dmfrImportWorker(id int, adapter gtdb.Adapter, exts []string, gtfsdir strin
184187
/////
185188

186189
type dmfrFetchCommand struct {
187-
workers int
188-
limit int
189-
dburl string
190-
gtfsdir string
191-
feedids []string
192-
adapter gtdb.Adapter
190+
workers int
191+
limit int
192+
dburl string
193+
gtfsdir string
194+
allowdups bool
195+
feedids []string
196+
adapter gtdb.Adapter
193197
}
194198

195199
func (cmd *dmfrFetchCommand) Run(args []string) error {
196200
fl := flag.NewFlagSet("fetch", flag.ExitOnError)
197201
fl.IntVar(&cmd.workers, "workers", 1, "Worker threads")
198-
fl.IntVar(&cmd.limit, "limit", 0, "Fetch at most n feeds")
199-
fl.StringVar(&cmd.dburl, "dburl", os.Getenv("DMFR_DATABASE_URL"), "Database URL (default: $DMFR_DATABASE_URL)")
202+
fl.IntVar(&cmd.limit, "limit", 0, "Maximum number of feeds to fetch")
203+
fl.StringVar(&cmd.dburl, "dburl", os.Getenv("DMFR_DATABASE_URL"), "Database URL")
200204
fl.StringVar(&cmd.gtfsdir, "gtfsdir", ".", "GTFS Directory")
205+
fl.BoolVar(&cmd.allowdups, "allow-duplicate-contents", false, "Allow duplicate internal SHA1 contents")
201206
fl.Usage = func() {
202207
fmt.Println("Usage: fetch [feedids...]")
208+
fl.PrintDefaults()
203209
}
204-
fl.Parse(args[1:])
210+
fl.Parse(args)
205211
feedids := fl.Args()
206212
if cmd.adapter == nil {
207213
writer := mustGetWriter(cmd.dburl, true)
@@ -229,56 +235,68 @@ func (cmd *dmfrFetchCommand) Run(args []string) error {
229235
if cmd.limit > 0 && cmd.limit < len(feeds) {
230236
feeds = feeds[:cmd.limit]
231237
}
238+
osids := map[int]string{}
239+
for _, feed := range feeds {
240+
osids[feed.ID] = feed.FeedID
241+
}
232242
///////////////
233243
// Here we go
234244
log.Info("Fetching %d feeds", len(feeds))
235245
fetchNew := 0
236246
fetchFound := 0
237247
fetchErrs := 0
238248
var wg sync.WaitGroup
239-
jobs := make(chan Feed, len(feeds))
249+
jobs := make(chan FetchOptions, len(feeds))
240250
results := make(chan FetchResult, len(feeds))
241251
for w := 0; w < cmd.workers; w++ {
242252
wg.Add(1)
243-
go dmfrFetchWorker(w, cmd.adapter, cmd.gtfsdir, jobs, results, &wg)
253+
go dmfrFetchWorker(w, cmd.adapter, jobs, results, &wg)
244254
}
245255
for _, feed := range feeds {
246-
jobs <- feed
256+
opts := FetchOptions{
257+
FeedID: feed.ID,
258+
Directory: cmd.gtfsdir,
259+
IgnoreDuplicateContents: cmd.allowdups,
260+
}
261+
jobs <- opts
247262
}
248263
close(jobs)
249264
wg.Wait()
250265
close(results)
251266
for fr := range results {
267+
osid := osids[fr.FeedVersion.FeedID]
252268
if err != nil {
253-
log.Info("Feed %s (id:%d): url: %s critical error: %s", fr.OnestopID, fr.FeedVersion.FeedID, fr.FeedVersion.URL, err.Error())
269+
log.Info("Feed %s (id:%d): url: %s critical error: %s", osid, fr.FeedVersion.FeedID, fr.FeedVersion.URL, err.Error())
254270
fetchErrs++
255271
} else if fr.FetchError != nil {
256-
log.Info("Feed %s (id:%d): url: %s fetch error: %s", fr.OnestopID, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FetchError.Error())
272+
log.Info("Feed %s (id:%d): url: %s fetch error: %s", osid, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FetchError.Error())
257273
fetchErrs++
258-
} else if fr.Found {
259-
log.Info("Feed %s (id:%d): url: %s found: %s (id:%d)", fr.OnestopID, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FeedVersion.SHA1, fr.FeedVersion.ID)
274+
} else if fr.FoundSHA1 {
275+
log.Info("Feed %s (id:%d): url: %s found zip sha1: %s (id:%d)", osid, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FeedVersion.SHA1, fr.FeedVersion.ID)
276+
fetchFound++
277+
} else if fr.FoundDirSHA1 {
278+
log.Info("Feed %s (id:%d): url: %s found contents sha1: %s (id:%d)", osid, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FeedVersion.SHA1Dir, fr.FeedVersion.ID)
260279
fetchFound++
261280
} else {
262-
log.Info("Feed %s (id:%d): url: %s new: %s (id:%d)", fr.OnestopID, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FeedVersion.SHA1, fr.FeedVersion.ID)
281+
log.Info("Feed %s (id:%d): url: %s new: %s (id:%d)", osid, fr.FeedVersion.FeedID, fr.FeedVersion.URL, fr.FeedVersion.SHA1, fr.FeedVersion.ID)
263282
fetchNew++
264283
}
265284
}
266285
log.Info("Existing: %d New: %d Errors: %d", fetchFound, fetchNew, fetchErrs)
267286
return nil
268287
}
269288

270-
func dmfrFetchWorker(id int, adapter gtdb.Adapter, gtfsdir string, jobs <-chan Feed, results chan<- FetchResult, wg *sync.WaitGroup) {
271-
for feed := range jobs {
289+
func dmfrFetchWorker(id int, adapter gtdb.Adapter, jobs <-chan FetchOptions, results chan<- FetchResult, wg *sync.WaitGroup) {
290+
for opts := range jobs {
272291
var fr FetchResult
273292
err := adapter.Tx(func(atx gtdb.Adapter) error {
274293
var fe error
275-
fr, fe = MainFetchFeed(atx, feed.ID, gtfsdir)
294+
fr, fe = MainFetchFeed(atx, opts)
276295
return fe
277296
})
278297
if err != nil {
279298
fmt.Println("Critical error:", err)
280299
}
281-
fr.OnestopID = feed.FeedID
282300
results <- fr
283301
}
284302
wg.Done()
@@ -294,11 +312,12 @@ type dmfrSyncCommand struct {
294312

295313
func (cmd *dmfrSyncCommand) Run(args []string) error {
296314
fl := flag.NewFlagSet("sync", flag.ExitOnError)
297-
fl.StringVar(&cmd.dburl, "dburl", os.Getenv("DMFR_DATABASE_URL"), "Database URL (default: $DMFR_DATABASE_URL)")
298315
fl.Usage = func() {
299316
fmt.Println("Usage: sync <filenames...>")
317+
fl.PrintDefaults()
300318
}
301-
fl.Parse(args[1:])
319+
fl.StringVar(&cmd.dburl, "dburl", os.Getenv("DMFR_DATABASE_URL"), "Database URL (default: $DMFR_DATABASE_URL)")
320+
fl.Parse(args)
302321
cmd.filenames = fl.Args()
303322
if cmd.adapter == nil {
304323
writer := mustGetWriter(cmd.dburl, true)
@@ -319,6 +338,7 @@ func (dmfrValidateCommand) Run(args []string) error {
319338
fl := flag.NewFlagSet("validate", flag.ExitOnError)
320339
fl.Usage = func() {
321340
fmt.Println("Usage: validate <filenames...>")
341+
fl.PrintDefaults()
322342
}
323343
fl.Parse(args)
324344
if fl.NArg() == 0 {

dmfr/dmfr_cmd_test.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ func Test_dmfrSyncCommand(t *testing.T) {
2626
errContains string
2727
command []string
2828
}{
29-
{2, "", []string{"sync", "../testdata/dmfr/example.json"}},
30-
{4, "", []string{"sync", "../testdata/dmfr/example.json", "../testdata/dmfr/bayarea.dmfr.json"}},
31-
{0, "no such file", []string{"sync", "../testdaata/dmfr/does-not-exist.json"}},
29+
{2, "", []string{"../testdata/dmfr/example.json"}},
30+
{4, "", []string{"../testdata/dmfr/example.json", "../testdata/dmfr/bayarea.dmfr.json"}},
31+
{0, "no such file", []string{"../testdaata/dmfr/does-not-exist.json"}},
3232
}
3333
_ = cases
3434
for _, exp := range cases {
@@ -83,10 +83,10 @@ func Test_dmfrFetchCommand(t *testing.T) {
8383
gtfsdir string
8484
command []string
8585
}{
86-
{1, "", []Feed{f200}, "", []string{"fetch"}},
87-
{1, "", []Feed{f200, f404}, "", []string{"fetch", "f--200", "f--404"}},
88-
{1, "", []Feed{f200, f404}, tmpdir, []string{"fetch", "-gtfsdir", tmpdir, "f--200"}},
89-
{0, "", []Feed{f200, f404}, "", []string{"fetch", "f--404"}},
86+
{1, "", []Feed{f200}, "", []string{}},
87+
{1, "", []Feed{f200, f404}, "", []string{"f--200", "f--404"}},
88+
{1, "", []Feed{f200, f404}, tmpdir, []string{"-gtfsdir", tmpdir, "f--200"}},
89+
{0, "", []Feed{f200, f404}, "", []string{"f--404"}},
9090
}
9191
_ = cases
9292
for _, exp := range cases {
@@ -108,9 +108,6 @@ func Test_dmfrFetchCommand(t *testing.T) {
108108
if len(feeds) != len(exp.feeds) {
109109
t.Errorf("got %d feeds, expect %d", len(feeds), len(exp.feeds))
110110
}
111-
// for _, feed := range feeds {
112-
// fmt.Printf("Feed: %#v\n", feed)
113-
// }
114111
fvs := []gotransit.FeedVersion{}
115112
testdb.ShouldSelect(t, adapter, &fvs, "SELECT * FROM feed_versions")
116113
if len(fvs) != exp.fvcount {

0 commit comments

Comments
 (0)