diff --git a/cmd/servus-boardgames/main.go b/cmd/servus-boardgames/main.go index 362c8d4..7b6617d 100644 --- a/cmd/servus-boardgames/main.go +++ b/cmd/servus-boardgames/main.go @@ -7,12 +7,14 @@ import ( "github.com/DictumMortuum/servus-extapi/pkg/config" "github.com/DictumMortuum/servus-extapi/pkg/middleware" "github.com/DictumMortuum/servus-extapi/pkg/queries" + "github.com/DictumMortuum/servus-extapi/pkg/queue" + "github.com/adjust/rmq/v5" "github.com/gin-gonic/gin" ) func Version(c *gin.Context) { rs := map[string]any{ - "version": "v0.0.10", + "version": "v0.0.11", } c.AbortWithStatusJSON(200, rs) } @@ -23,10 +25,19 @@ func main() { log.Fatal(err) } + connection, err := rmq.OpenConnection("handler", "tcp", "localhost:6379", 2, nil) + if err != nil { + log.Fatal(err) + } + + go queue.Cleaner(connection) + go Consumer(connection) + r := gin.Default() r.Use(middleware.Cors()) g := r.Group("/boardgames") g.GET("/version", Version) + g.GET("/queue", queue.GetStats(connection, "", "")) g.GET( "/all", @@ -64,5 +75,21 @@ func main() { middleware.Result, ) + g.POST( + "/scrape/url/:id", + middleware.Id, + middleware.Body, + adapter.A(ScrapeUrl), + middleware.Result, + ) + + g.POST( + "/scrape/:id", + middleware.Id, + middleware.Body, + adapter.A(ScrapeStore), + middleware.Result, + ) + r.Run(":10002") } diff --git a/cmd/servus-boardgames/queue.go b/cmd/servus-boardgames/queue.go new file mode 100644 index 0000000..caa7574 --- /dev/null +++ b/cmd/servus-boardgames/queue.go @@ -0,0 +1,99 @@ +package main + +import ( + "encoding/json" + "log" + "time" + + "github.com/DictumMortuum/servus-extapi/pkg/db" + "github.com/DictumMortuum/servus-extapi/pkg/scrape" + "github.com/adjust/rmq/v5" + "github.com/jmoiron/sqlx" +) + +func setUrlToScraped(DB *sqlx.DB, payload map[string]any) error { + _, err := DB.NamedExec(` + update + tscrapeurl + set + last_scraped = :scraped, + last_instock = :instock, + last_preorder = :preorder, + last_outofstock = :outofstock, + last_pages = :pages_visited + where + id = :id + `, payload) + if err != nil { + return err + } + + return nil +} + +func consumeFn(task scrape.GenericScrapeRequest) error { + DB, err := db.DatabaseX() + if err != nil { + return err + } + defer DB.Close() + + sc, err := getScrape(DB, task.ScrapeUrl.ScrapeId) + if err != nil { + return err + } + + err = scrape.Stale(DB, sc.StoreId) + if err != nil { + return err + } + + payload, _, err := scrape.GenericScrape(*sc, DB, task) + if err != nil { + return err + } + + err = scrape.Cleanup(DB, sc.StoreId) + if err != nil { + return err + } + + log.Println(task.ListOnly, payload) + err = setUrlToScraped(DB, payload) + if err != nil { + return err + } + + return nil +} + +func Consumer(conn rmq.Connection) { + queue, err := conn.OpenQueue("scrape") + if err != nil { + log.Fatal(err) + } + + err = queue.StartConsuming(10, time.Second) + if err != nil { + log.Fatal(err) + } + + _, err = queue.AddConsumerFunc("scraper", func(d rmq.Delivery) { + var task scrape.GenericScrapeRequest + err = json.Unmarshal([]byte(d.Payload()), &task) + if err != nil { + d.Reject() + } + + err = consumeFn(task) + if err != nil { + log.Println(err) + d.Reject() + } + + d.Ack() + }) + if err != nil { + log.Fatal(err) + } +} diff --git a/cmd/servus-boardgames/scrape.go b/cmd/servus-boardgames/scrape.go new file mode 100644 index 0000000..61dcd7b --- /dev/null +++ b/cmd/servus-boardgames/scrape.go @@ -0,0 +1,222 @@ +package main + +import ( + "encoding/json" + + "github.com/DictumMortuum/servus-extapi/pkg/model" + "github.com/DictumMortuum/servus-extapi/pkg/queries" + "github.com/DictumMortuum/servus-extapi/pkg/scrape" + "github.com/jmoiron/sqlx" +) + +func getScrape(DB *sqlx.DB, id int64) (*model.Scrape, error) { + sc := model.Scrape{} + err := DB.Get(&sc, ` + select + * + from + tscrape + where + id = ? + `, id) + if err != nil { + return nil, err + } + + return &sc, nil +} + +func getScrapeUrl(DB *sqlx.DB, id int64) (*model.ScrapeUrl, error) { + u := model.ScrapeUrl{} + err := DB.Get(&u, ` + select + * + from + tscrapeurl + where + id = ? + `, id) + if err != nil { + return nil, err + } + + return &u, nil +} + +func getScrapeUrls(DB *sqlx.DB, id int64) ([]model.ScrapeUrl, error) { + u := []model.ScrapeUrl{} + err := DB.Select(&u, ` + select + * + from + tscrapeurl + where + scrape_id = ? + `, id) + if err != nil { + return nil, err + } + + return u, nil +} + +func setUrlToPending(DB *sqlx.DB, id int64) error { + _, err := DB.Exec(` + update + tscrapeurl + set + last_scraped = NULL, + last_instock = NULL, + last_preorder = NULL, + last_outofstock = NULL, + last_pages = NULL + where + id = ? + `, id) + if err != nil { + return err + } + + return nil +} + +func scrapeSingle(req *model.Map, r *scrape.GenericScrapeRequest) error { + conn, err := req.GetRmq() + if err != nil { + return err + } + + DB, err := req.GetDB() + if err != nil { + return err + } + + scrape, err := conn.OpenQueue("scrape") + if err != nil { + return err + } + + raw, err := json.Marshal(r) + if err != nil { + return err + } + + err = scrape.Publish(string(raw)) + if err != nil { + return err + } + + err = setUrlToPending(DB, r.ScrapeUrl.Id) + if err != nil { + return err + } + + return nil +} + +type scrapeBody struct { + ListOnly bool `json:"list_only"` +} + +func ScrapeUrl(req *model.Map, res *model.Map) error { + id, err := req.GetInt64("id") + if err != nil { + return err + } + + DB, err := req.GetDB() + if err != nil { + return err + } + + body, err := req.GetByte("body") + if err != nil { + return err + } + + var payload scrapeBody + err = json.Unmarshal(body, &payload) + if err != nil { + return err + } + + u, err := getScrapeUrl(DB, id) + if err != nil { + return err + } + + cfg, err := queries.GetConfig(DB, "SCRAPE_CACHE") + if err != nil { + return err + } + + r := scrape.GenericScrapeRequest{ + ScrapeUrl: *u, + Cache: cfg.Value, + ListOnly: payload.ListOnly, + } + + err = scrapeSingle(req, &r) + if err != nil { + return err + } + + res.SetInternal(map[string]any{ + "req": r, + }) + + return nil +} + +func ScrapeStore(req *model.Map, res *model.Map) error { + id, err := req.GetInt64("id") + if err != nil { + return err + } + + DB, err := req.GetDB() + if err != nil { + return err + } + + body, err := req.GetByte("body") + if err != nil { + return err + } + + var payload scrapeBody + err = json.Unmarshal(body, &payload) + if err != nil { + return err + } + + sc, err := getScrape(DB, id) + if err != nil { + return err + } + + u, err := getScrapeUrls(DB, sc.Id) + if err != nil { + return err + } + + cfg, err := queries.GetConfig(DB, "SCRAPE_CACHE") + if err != nil { + return err + } + + for _, url := range u { + r := scrape.GenericScrapeRequest{ + ScrapeUrl: url, + Cache: cfg.Value, + ListOnly: payload.ListOnly, + } + + err = scrapeSingle(req, &r) + if err != nil { + return err + } + } + + return nil +} diff --git a/go.mod b/go.mod index ed3cf7a..dfdcd6d 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,9 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/BurntSushi/toml v1.4.0 // indirect github.com/PuerkitoBio/goquery v1.8.1 // indirect + github.com/adjust/rmq/v5 v5.2.0 // indirect + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect + github.com/alicebob/miniredis/v2 v2.30.4 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect github.com/antchfx/htmlquery v1.3.0 // indirect github.com/antchfx/xmlquery v1.3.17 // indirect @@ -103,6 +106,7 @@ require ( github.com/ysmood/got v0.40.0 // indirect github.com/ysmood/gson v0.7.3 // indirect github.com/ysmood/leakless v0.9.0 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect golang.org/x/arch v0.13.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/sys v0.29.0 // indirect diff --git a/go.sum b/go.sum index 34f4849..62a5e31 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,15 @@ github.com/MicahParks/keyfunc/v2 v2.1.0/go.mod h1:rW42fi+xgLJ2FRRXAfNx9ZA8WpD4Oe github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/PuerkitoBio/goquery v1.8.1 h1:uQxhNlArOIdbrH1tr0UXwdVFgDcZDrZVdcpygAcwmWM= github.com/PuerkitoBio/goquery v1.8.1/go.mod h1:Q8ICL1kNUJ2sXGoAhPGUdYDJvgQgHzJsnnd3H7Ho5jQ= +github.com/adjust/rmq/v5 v5.2.0 h1:ENPC+3i8N/LAvAfHpEpTMVl7q8zmwh4nl+hhxkao6KE= +github.com/adjust/rmq/v5 v5.2.0/go.mod h1:FfA6MzYJHeLbuATsNYaZYZaISyxxADDXQLN9QBroFCw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo= +github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY= github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= @@ -64,6 +71,9 @@ github.com/bytedance/sonic/loader v0.2.2/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFos github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -447,6 +457,8 @@ github.com/ysmood/leakless v0.9.0 h1:qxCG5VirSBvmi3uynXFkcnLMzkphdh3xx5FtrORwDCU github.com/ysmood/leakless v0.9.0/go.mod h1:R8iAXPRaG97QJwqxs74RdwzcRHT1SWCGTNqY8q0JvMQ= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/ziutek/telnet v0.0.0-20180329124119-c3b780dc415b h1:VfPXB/wCGGt590QhD1bOpv2J/AmC/RJNTg/Q59HKSB0= github.com/ziutek/telnet v0.0.0-20180329124119-c3b780dc415b/go.mod h1:IZpXDfkJ6tWD3PhBK5YzgQT+xJWh7OsdwiG8hA2MkO4= github.com/ziutek/telnet v0.1.0 h1:Fds2AqweYyoRHX/5X8ikiyqIcSl156Sf2xCvURfqXHA= @@ -520,6 +532,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/model/map.go b/pkg/model/map.go index e22a3b6..5b95160 100644 --- a/pkg/model/map.go +++ b/pkg/model/map.go @@ -5,6 +5,7 @@ import ( "github.com/DictumMortuum/servus-extapi/pkg/config" "github.com/DictumMortuum/servus-extapi/pkg/util" + "github.com/adjust/rmq/v5" "github.com/gin-gonic/gin" _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" @@ -177,3 +178,22 @@ func (m *Map) GetRedis() (*redis.Client, error) { return rdb, nil } } + +func (m *Map) GetRmq() (rmq.Connection, error) { + if val, ok := m.Internal["rmq"]; ok { + conn, ok := val.(rmq.Connection) + if !ok { + return nil, fmt.Errorf("error with retrieving rmq pointer") + } else { + return conn, nil + } + } else { + connection, err := rmq.OpenConnection("handler", "tcp", "localhost:6379", 2, nil) + if err != nil { + return nil, fmt.Errorf("error opening rmq connection") + } + + m.Internal["rmq"] = connection + return connection, nil + } +} diff --git a/pkg/model/scrape.go b/pkg/model/scrape.go new file mode 100644 index 0000000..1a3a6fa --- /dev/null +++ b/pkg/model/scrape.go @@ -0,0 +1,114 @@ +package model + +import ( + "encoding/json" + + "github.com/DictumMortuum/servus/pkg/models" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type Scrape struct { + Id int64 `gorm:"primaryKey" json:"id"` + StoreId int64 `gorm:"foreignkey" json:"store_id"` + StoreName string `gorm:"-" json:"store_name"` + SelItem string `json:"sel_item"` + SelName string `json:"sel_name"` + SelItemThumb string `json:"sel_item_thumb"` + SelItemInstock models.JsonNullString `json:"sel_item_instock"` + SelItemPreorder models.JsonNullString `json:"sel_item_preorder"` + SelItemOutofstock models.JsonNullString `json:"sel_item_outofstock"` + SelPrice string `json:"sel_price"` + SelAltPrice models.JsonNullString `json:"sel_alt_price"` + SelOriginalPrice string `json:"sel_original_price"` + SelUrl string `json:"sel_url"` + SelNext string `json:"sel_next"` + Tag string `json:"tag"` + AllowedDomain string `json:"allowed_domain"` + AbsoluteNextUrl bool `json:"absolute_next_url"` + URLs []ScrapeUrl `json:"urls"` +} + +func (Scrape) TableName() string { + return "tscrape" +} + +func (Scrape) DefaultFilter(db *gorm.DB) *gorm.DB { + return db +} + +func (c Scrape) List(db *gorm.DB, scopes ...func(*gorm.DB) *gorm.DB) (any, error) { + var data []Scrape + rs := db.Scopes(scopes...).Preload("URLs").Scopes(c.DefaultFilter).Find(&data) + return data, rs.Error +} + +func (Scrape) Get(db *gorm.DB, id int64) (any, error) { + var data Scrape + rs := db.Preload("URLs").First(&data, id) + return data, rs.Error +} + +func (obj Scrape) Update(db *gorm.DB, id int64, body []byte) (any, error) { + var payload Scrape + err := json.Unmarshal(body, &payload) + if err != nil { + return nil, err + } + + // delete(payload, "store_name") + + // rs := db.Model(&model).Save(payload) + // if rs.Error != nil { + // return nil, err + // } + + err = db.Debug().Model(&payload).Association("URLs").Unscoped().Replace(payload.URLs) + if err != nil { + return err, nil + } + + rs := db.Clauses(clause.OnConflict{ + UpdateAll: true, + }).Session(&gorm.Session{ + FullSaveAssociations: true, + }).Create(&payload) + if rs.Error != nil { + return nil, err + } + + return obj.Get(db, id) +} + +func (Scrape) Create(db *gorm.DB, body []byte) (any, error) { + var payload Scrape + err := json.Unmarshal(body, &payload) + if err != nil { + return nil, err + } + + rs := db.Debug().Clauses(clause.OnConflict{ + UpdateAll: true, + }).Session(&gorm.Session{ + FullSaveAssociations: true, + }).Create(&payload) + if rs.Error != nil { + return nil, err + } + + return payload, nil +} + +func (obj Scrape) Delete(db *gorm.DB, id int64) (any, error) { + data, err := obj.Get(db, id) + if err != nil { + return nil, err + } + + rs := db.Delete(&Scrape{}, id) + if rs.Error != nil { + return nil, rs.Error + } + + return data, nil +} diff --git a/pkg/model/scrapeurl.go b/pkg/model/scrapeurl.go new file mode 100644 index 0000000..7ed8aa4 --- /dev/null +++ b/pkg/model/scrapeurl.go @@ -0,0 +1,88 @@ +package model + +import ( + "database/sql" + "encoding/json" + + "gorm.io/gorm" +) + +type ScrapeUrl struct { + Id int64 `gorm:"primaryKey" json:"id"` + StoreId int64 `gorm:"foreignkey" json:"store_id"` + ScrapeId int64 `gorm:"foreignkey" json:"scrape_id"` + Url string `json:"url"` + LastScraped sql.NullInt32 `json:"last_scraped"` + LastInstock sql.NullInt32 `json:"last_instock"` + LastPreorder sql.NullInt32 `json:"last_preorder"` + LastOutofstock sql.NullInt32 `json:"last_outofstock"` + LastPages sql.NullInt32 `json:"last_pages"` +} + +func (ScrapeUrl) TableName() string { + return "tscrapeurl" +} + +func (ScrapeUrl) DefaultFilter(db *gorm.DB) *gorm.DB { + return db +} + +func (c ScrapeUrl) List(db *gorm.DB, scopes ...func(*gorm.DB) *gorm.DB) (any, error) { + var data []ScrapeUrl + rs := db.Scopes(scopes...).Scopes(c.DefaultFilter).Find(&data) + return data, rs.Error +} + +func (ScrapeUrl) Get(db *gorm.DB, id int64) (any, error) { + var data ScrapeUrl + rs := db.First(&data, id) + return data, rs.Error +} + +func (obj ScrapeUrl) Update(db *gorm.DB, id int64, body []byte) (any, error) { + model := ScrapeUrl{ + Id: id, + } + + var payload map[string]any + err := json.Unmarshal(body, &payload) + if err != nil { + return nil, err + } + + rs := db.Model(&model).Save(payload) + if rs.Error != nil { + return nil, err + } + + return obj.Get(db, id) +} + +func (ScrapeUrl) Create(db *gorm.DB, body []byte) (any, error) { + var payload ScrapeUrl + err := json.Unmarshal(body, &payload) + if err != nil { + return nil, err + } + + rs := db.Create(&payload) + if rs.Error != nil { + return nil, err + } + + return payload, nil +} + +func (obj ScrapeUrl) Delete(db *gorm.DB, id int64) (any, error) { + data, err := obj.Get(db, id) + if err != nil { + return nil, err + } + + rs := db.Delete(&ScrapeUrl{}, id) + if rs.Error != nil { + return nil, rs.Error + } + + return data, nil +} diff --git a/pkg/queries/config.go b/pkg/queries/config.go new file mode 100644 index 0000000..54972cb --- /dev/null +++ b/pkg/queries/config.go @@ -0,0 +1,16 @@ +package queries + +import ( + "github.com/DictumMortuum/servus-extapi/pkg/model" + "github.com/jmoiron/sqlx" +) + +func GetConfig(DB *sqlx.DB, c string) (*model.Configuration, error) { + var cfg model.Configuration + err := DB.Get(&cfg, `select * from tconfig where config = ?`, c) + if err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go new file mode 100644 index 0000000..79d2b75 --- /dev/null +++ b/pkg/queue/queue.go @@ -0,0 +1,34 @@ +package queue + +import ( + "log" + "net/http" + "time" + + "github.com/adjust/rmq/v5" + "github.com/gin-gonic/gin" +) + +func GetStats(connection rmq.Connection, layout string, refresh string) gin.HandlerFunc { + return func(ctx *gin.Context) { + queues, err := connection.GetOpenQueues() + if err != nil { + log.Fatal(err) + } + + stats, err := connection.CollectStats(queues) + if err != nil { + log.Fatal(err) + } + + ctx.Data(http.StatusOK, "text/html; charset=utf-8", []byte(stats.GetHtml(layout, refresh))) + } +} + +func Cleaner(connection rmq.Connection) { + cleaner := rmq.NewCleaner(connection) + + for range time.Tick(time.Minute) { + cleaner.Clean() + } +} diff --git a/pkg/scrape/common.go b/pkg/scrape/common.go index 7a4d369..1236cb0 100644 --- a/pkg/scrape/common.go +++ b/pkg/scrape/common.go @@ -2,6 +2,7 @@ package scrape import ( "regexp" + "sort" "strconv" "strings" @@ -177,3 +178,32 @@ func getURL(raw string) []string { xurl := xurls.Strict() return xurl.FindAllString(raw, -1) } + +func unique(col []map[string]any) []map[string]any { + temp := map[string]map[string]any{} + + for _, item := range col { + if val, ok := item["name"]; ok { + if name, ok := val.(string); ok { + name = strings.TrimSpace(name) + + if name == "" { + continue + } + + temp[name] = item + } + } + } + + rs := []map[string]any{} + for _, val := range temp { + rs = append(rs, val) + } + + sort.Slice(rs, func(i int, j int) bool { + return rs[i]["name"].(string) > rs[j]["name"].(string) + }) + + return rs +} diff --git a/pkg/scrape/db.go b/pkg/scrape/db.go new file mode 100644 index 0000000..789211c --- /dev/null +++ b/pkg/scrape/db.go @@ -0,0 +1,110 @@ +package scrape + +import ( + "github.com/jmoiron/sqlx" +) + +func Insert(DB *sqlx.DB, payload map[string]any) (int64, error) { + if val, ok := payload["price"]; ok { + if val.(float64) == 0 { + return -1, nil + } + } + + if val, ok := payload["store_thumb"]; ok { + if val.(string) == "" { + payload["store_thumb"] = "https://placehold.co/200x200" + } + } + + if _, ok := payload["tag"]; !ok { + payload["tag"] = "" + } + + q := ` + insert into tprices ( + name, + store_id, + store_thumb, + price, + original_price, + stock, + url, + deleted, + boardgame_id, + created, + updated, + tag + ) values ( + :name, + :store_id, + :store_thumb, + :price, + :original_price, + :stock, + :url, + 0, + NULL, + NOW(), + NOW(), + :tag + ) on duplicate key update + updated = NOW(), + store_thumb = :store_thumb, + price = :price, + original_price = :original_price, + stock = :stock, + deleted = 0 + ` + row, err := DB.NamedExec(q, payload) + if err != nil { + return -1, err + } + + id, err := row.LastInsertId() + if err != nil { + return -1, err + } + + return id, nil +} + +func Stale(DB *sqlx.DB, id int64) error { + q := `update tprices set deleted = 1 where store_id = ?` + _, err := DB.Exec(q, id) + if err != nil { + return err + } + + return nil +} + +func Fresh(DB *sqlx.DB, id int64) error { + q := `update tprices set deleted = 0 where id = ?` + _, err := DB.Exec(q, id) + if err != nil { + return err + } + + return nil +} + +func Cleanup(DB *sqlx.DB, id int64) error { + q := `delete from tprices where deleted = 1 and store_id = ?` + _, err := DB.Exec(q, id) + if err != nil { + return err + } + + return nil +} + +func Delete(DB *sqlx.DB, id int64) error { + q := `delete from tprices where store_id = ?` + _, err := DB.Exec(q, id) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/scrape/generic.go b/pkg/scrape/generic.go new file mode 100644 index 0000000..f4efeaf --- /dev/null +++ b/pkg/scrape/generic.go @@ -0,0 +1,190 @@ +package scrape + +import ( + "errors" + "log" + "strings" + + "github.com/DictumMortuum/servus-extapi/pkg/model" + "github.com/gocolly/colly/v2" + "github.com/jmoiron/sqlx" +) + +func countStock(col []map[string]any, status int) int { + count := 0 + + for _, item := range col { + if val, ok := item["stock"]; ok { + if val.(int) == status { + count++ + } + } + } + + return count +} + +func extractText(e *colly.HTMLElement, s string) string { + tmp := strings.Split(s, ",") + + if len(tmp) == 1 { + if s[0] == '.' || s[0] == '#' { + return e.ChildText(s) + } else { + return e.Attr(s) + } + } else if len(tmp) == 2 { + return e.ChildAttr(tmp[0], tmp[1]) + } + + return "" +} + +func extractCmp(e *colly.HTMLElement, s string) bool { + cmp := strings.Split(s, "@") + + if len(cmp) != 2 { + log.Println("comparison not defined", s) + return false + } + + tmp := strings.Split(cmp[1], ",") + + switch cmp[0] { + case "childHasClass": + { + return childHasClass(e, tmp[0], tmp[1]) + } + case "hasClass": + { + return hasClass(e, tmp[0]) + } + default: + { + if len(tmp) == 2 { + return e.ChildText(tmp[0]) == tmp[1] + } else if len(tmp) == 3 { + return e.ChildAttr(tmp[0], tmp[1]) == tmp[2] + } + } + } + + return false +} + +func extractURL(e *colly.HTMLElement, s string, isAbsolute bool) string { + if isAbsolute { + return e.Request.AbsoluteURL(extractText(e, s)) + } else { + return extractText(e, s) + } +} + +type GenericScrapeRequest struct { + ScrapeUrl model.ScrapeUrl + Cache bool + ListOnly bool +} + +func GenericScrape(scraper model.Scrape, DB *sqlx.DB, req GenericScrapeRequest) (map[string]any, []map[string]any, error) { + store_id := scraper.StoreId + rs := []map[string]any{} + pages := []string{req.ScrapeUrl.Url} + + collector := colly.NewCollector( + colly.AllowedDomains(scraper.AllowedDomain), + user_agent, + ) + + if req.Cache { + collector.CacheDir = CacheDir + } + + collector.OnHTML(scraper.SelItem, func(e *colly.HTMLElement) { + raw_price := extractText(e, scraper.SelPrice) + if scraper.SelAltPrice.Valid { + if raw_price == "" { + raw_price = extractText(e, scraper.SelAltPrice.String) + } + } + + old_price := extractText(e, scraper.SelOriginalPrice) + if old_price == "" { + old_price = raw_price + } + + stock := 0 + + if scraper.SelItemInstock.Valid { + if extractCmp(e, scraper.SelItemInstock.String) { + stock = 0 + } + } + + if scraper.SelItemPreorder.Valid { + if extractCmp(e, scraper.SelItemPreorder.String) { + stock = 1 + } + } + + if scraper.SelItemOutofstock.Valid { + if extractCmp(e, scraper.SelItemOutofstock.String) { + stock = 2 + } + } + + item := map[string]any{ + "name": extractText(e, scraper.SelName), + "store_id": store_id, + "store_thumb": extractText(e, scraper.SelItemThumb), + "stock": stock, + "price": getPrice(raw_price), + "original_price": getPrice(old_price), + "url": extractURL(e, scraper.SelUrl, scraper.AbsoluteNextUrl), + } + + rs = append(rs, item) + }) + + collector.OnHTML(scraper.SelNext, func(e *colly.HTMLElement) { + link := extractURL(e, "href", scraper.AbsoluteNextUrl) + + if Debug { + log.Println("Visiting: " + link) + } + + pages = append(pages, link) + collector.Visit(link) + }) + + collector.Visit(req.ScrapeUrl.Url) + collector.Wait() + + uniqueRs := unique(rs) + + inserted := 0 + if !req.ListOnly { + for _, item := range rs { + id, err := Insert(DB, item) + if err != nil { + return nil, nil, errors.New("could not insert item") + } + + if id != -1 { + inserted++ + } + } + } + + return map[string]interface{}{ + "id": req.ScrapeUrl.Id, + "name": scraper.StoreName, + "store_id": store_id, + "scraped": len(uniqueRs), + "instock": countStock(uniqueRs, 0), + "preorder": countStock(uniqueRs, 1), + "outofstock": countStock(uniqueRs, 2), + "pages_visited": len(pages), + "inserted": inserted, + }, uniqueRs, nil +}