-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgtfs_rt.go
105 lines (91 loc) · 2.39 KB
/
gtfs_rt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"log"
"net/http"
"time"
"github.com/MobilityData/gtfs-realtime-bindings/golang/gtfs"
"google.golang.org/protobuf/proto"
)
type GtfsRtUpdateData struct {
delaySecs int32
lat float32
lon float32
tripId string
stopId string
}
func addToFeed(msg *gtfs.FeedMessage, data GtfsRtUpdateData) {
pos := gtfs.Position{
Latitude: &data.lat,
Longitude: &data.lon,
}
descr := gtfs.TripDescriptor{
TripId: &data.tripId,
}
update := gtfs.TripUpdate{
/* TODO: TripDescriptor */
Trip: &descr,
Delay: &data.delaySecs,
/*
The API doesn't seem to provide delay data for future stops, so we can only provide an update for the next stop.
MOTIS at least correctly extrapolates the delay to affect all future stops equally.
*/
// TODO: Memorize delays for this trip in the past, to provide to clients which missed previous updates
StopTimeUpdate: []*gtfs.TripUpdate_StopTimeUpdate{
{
StopId: &data.stopId,
Arrival: >fs.TripUpdate_StopTimeEvent{
Delay: &data.delaySecs,
},
},
},
}
vPos := gtfs.VehiclePosition{
Position: &pos,
}
/* Each FeedEntity should contain only one type of update, so we need 2 */
e := gtfs.FeedEntity{
Id: &data.tripId,
TripUpdate: &update,
}
msg.Entity = append(msg.Entity, &e)
e2 := gtfs.FeedEntity{
Id: &data.tripId,
Vehicle: &vPos,
}
msg.Entity = append(msg.Entity, &e2)
}
func createGtfsRtMsg(data []GtfsRtUpdateData) []byte {
var now uint64 = uint64(time.Now().Unix())
var gtfs_version string = "2.0"
feed := gtfs.FeedMessage{}
feed.Header = >fs.FeedHeader{
Timestamp: &now,
GtfsRealtimeVersion: >fs_version,
}
for _, datum := range data {
addToFeed(&feed, datum)
}
buf, err := proto.Marshal(&feed)
if err != nil {
log.Fatalf("Failed to serialize RTFS-RT protobuf message: %s\n", err)
}
if config.verbose {
log.Printf("GTFS-RT message:\n%s\n", feed.String())
}
return buf
}
// Serve feed data via HTTP, updating it when new one comes in from the channel
func listenAndServeFeed(dataChan chan []byte) {
// Block here until we have some data
feedData := <-dataChan
go func() {
for {
// FIXME: Prevent data race by locking this when updating
feedData = <-dataChan
}
}()
http.DefaultServeMux.HandleFunc("/gtfs-rt.pb", func(w http.ResponseWriter, r *http.Request) {
w.Write(feedData)
})
http.ListenAndServe(config.listenAddr, nil)
}