Skip to content

Commit

Permalink
[boardgames] introducing a background mechanism for scraping websites…
Browse files Browse the repository at this point in the history
… using queues
  • Loading branch information
DictumMortuum committed Jan 26, 2025
1 parent a80508e commit daa3125
Show file tree
Hide file tree
Showing 13 changed files with 968 additions and 1 deletion.
29 changes: 28 additions & 1 deletion cmd/servus-boardgames/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"github.com/DictumMortuum/servus-extapi/pkg/config"
"github.com/DictumMortuum/servus-extapi/pkg/middleware"
"github.com/DictumMortuum/servus-extapi/pkg/queries"
"github.com/DictumMortuum/servus-extapi/pkg/queue"
"github.com/adjust/rmq/v5"
"github.com/gin-gonic/gin"
)

func Version(c *gin.Context) {
rs := map[string]any{
"version": "v0.0.10",
"version": "v0.0.11",
}
c.AbortWithStatusJSON(200, rs)
}
Expand All @@ -23,10 +25,19 @@ func main() {
log.Fatal(err)
}

connection, err := rmq.OpenConnection("handler", "tcp", "localhost:6379", 2, nil)
if err != nil {
log.Fatal(err)
}

go queue.Cleaner(connection)
go Consumer(connection)

r := gin.Default()
r.Use(middleware.Cors())
g := r.Group("/boardgames")
g.GET("/version", Version)
g.GET("/queue", queue.GetStats(connection, "", ""))

g.GET(
"/all",
Expand Down Expand Up @@ -64,5 +75,21 @@ func main() {
middleware.Result,
)

g.POST(
"/scrape/url/:id",
middleware.Id,
middleware.Body,
adapter.A(ScrapeUrl),
middleware.Result,
)

g.POST(
"/scrape/:id",
middleware.Id,
middleware.Body,
adapter.A(ScrapeStore),
middleware.Result,
)

r.Run(":10002")
}
99 changes: 99 additions & 0 deletions cmd/servus-boardgames/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"encoding/json"
"log"
"time"

"github.com/DictumMortuum/servus-extapi/pkg/db"
"github.com/DictumMortuum/servus-extapi/pkg/scrape"
"github.com/adjust/rmq/v5"
"github.com/jmoiron/sqlx"
)

func setUrlToScraped(DB *sqlx.DB, payload map[string]any) error {
_, err := DB.NamedExec(`
update
tscrapeurl
set
last_scraped = :scraped,
last_instock = :instock,
last_preorder = :preorder,
last_outofstock = :outofstock,
last_pages = :pages_visited
where
id = :id
`, payload)
if err != nil {
return err
}

return nil
}

func consumeFn(task scrape.GenericScrapeRequest) error {
DB, err := db.DatabaseX()
if err != nil {
return err
}
defer DB.Close()

sc, err := getScrape(DB, task.ScrapeUrl.ScrapeId)
if err != nil {
return err
}

err = scrape.Stale(DB, sc.StoreId)
if err != nil {
return err
}

payload, _, err := scrape.GenericScrape(*sc, DB, task)
if err != nil {
return err
}

err = scrape.Cleanup(DB, sc.StoreId)
if err != nil {
return err
}

log.Println(task.ListOnly, payload)
err = setUrlToScraped(DB, payload)
if err != nil {
return err
}

return nil
}

func Consumer(conn rmq.Connection) {
queue, err := conn.OpenQueue("scrape")
if err != nil {
log.Fatal(err)
}

err = queue.StartConsuming(10, time.Second)
if err != nil {
log.Fatal(err)
}

_, err = queue.AddConsumerFunc("scraper", func(d rmq.Delivery) {
var task scrape.GenericScrapeRequest
err = json.Unmarshal([]byte(d.Payload()), &task)
if err != nil {
d.Reject()
}

err = consumeFn(task)
if err != nil {
log.Println(err)
d.Reject()
}

d.Ack()
})
if err != nil {
log.Fatal(err)
}
}
222 changes: 222 additions & 0 deletions cmd/servus-boardgames/scrape.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package main

import (
"encoding/json"

"github.com/DictumMortuum/servus-extapi/pkg/model"
"github.com/DictumMortuum/servus-extapi/pkg/queries"
"github.com/DictumMortuum/servus-extapi/pkg/scrape"
"github.com/jmoiron/sqlx"
)

func getScrape(DB *sqlx.DB, id int64) (*model.Scrape, error) {
sc := model.Scrape{}
err := DB.Get(&sc, `
select
*
from
tscrape
where
id = ?
`, id)
if err != nil {
return nil, err
}

return &sc, nil
}

func getScrapeUrl(DB *sqlx.DB, id int64) (*model.ScrapeUrl, error) {
u := model.ScrapeUrl{}
err := DB.Get(&u, `
select
*
from
tscrapeurl
where
id = ?
`, id)
if err != nil {
return nil, err
}

return &u, nil
}

func getScrapeUrls(DB *sqlx.DB, id int64) ([]model.ScrapeUrl, error) {
u := []model.ScrapeUrl{}
err := DB.Select(&u, `
select
*
from
tscrapeurl
where
scrape_id = ?
`, id)
if err != nil {
return nil, err
}

return u, nil
}

func setUrlToPending(DB *sqlx.DB, id int64) error {
_, err := DB.Exec(`
update
tscrapeurl
set
last_scraped = NULL,
last_instock = NULL,
last_preorder = NULL,
last_outofstock = NULL,
last_pages = NULL
where
id = ?
`, id)
if err != nil {
return err
}

return nil
}

func scrapeSingle(req *model.Map, r *scrape.GenericScrapeRequest) error {
conn, err := req.GetRmq()
if err != nil {
return err
}

DB, err := req.GetDB()
if err != nil {
return err
}

scrape, err := conn.OpenQueue("scrape")
if err != nil {
return err
}

raw, err := json.Marshal(r)
if err != nil {
return err
}

err = scrape.Publish(string(raw))
if err != nil {
return err
}

err = setUrlToPending(DB, r.ScrapeUrl.Id)
if err != nil {
return err
}

return nil
}

type scrapeBody struct {
ListOnly bool `json:"list_only"`
}

func ScrapeUrl(req *model.Map, res *model.Map) error {
id, err := req.GetInt64("id")
if err != nil {
return err
}

DB, err := req.GetDB()
if err != nil {
return err
}

body, err := req.GetByte("body")
if err != nil {
return err
}

var payload scrapeBody
err = json.Unmarshal(body, &payload)
if err != nil {
return err
}

u, err := getScrapeUrl(DB, id)
if err != nil {
return err
}

cfg, err := queries.GetConfig(DB, "SCRAPE_CACHE")
if err != nil {
return err
}

r := scrape.GenericScrapeRequest{
ScrapeUrl: *u,
Cache: cfg.Value,
ListOnly: payload.ListOnly,
}

err = scrapeSingle(req, &r)
if err != nil {
return err
}

res.SetInternal(map[string]any{
"req": r,
})

return nil
}

func ScrapeStore(req *model.Map, res *model.Map) error {
id, err := req.GetInt64("id")
if err != nil {
return err
}

DB, err := req.GetDB()
if err != nil {
return err
}

body, err := req.GetByte("body")
if err != nil {
return err
}

var payload scrapeBody
err = json.Unmarshal(body, &payload)
if err != nil {
return err
}

sc, err := getScrape(DB, id)
if err != nil {
return err
}

u, err := getScrapeUrls(DB, sc.Id)
if err != nil {
return err
}

cfg, err := queries.GetConfig(DB, "SCRAPE_CACHE")
if err != nil {
return err
}

for _, url := range u {
r := scrape.GenericScrapeRequest{
ScrapeUrl: url,
Cache: cfg.Value,
ListOnly: payload.ListOnly,
}

err = scrapeSingle(req, &r)
if err != nil {
return err
}
}

return nil
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/PuerkitoBio/goquery v1.8.1 // indirect
github.com/adjust/rmq/v5 v5.2.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/alicebob/miniredis/v2 v2.30.4 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/antchfx/htmlquery v1.3.0 // indirect
github.com/antchfx/xmlquery v1.3.17 // indirect
Expand Down Expand Up @@ -103,6 +106,7 @@ require (
github.com/ysmood/got v0.40.0 // indirect
github.com/ysmood/gson v0.7.3 // indirect
github.com/ysmood/leakless v0.9.0 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
golang.org/x/arch v0.13.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/sys v0.29.0 // indirect
Expand Down
Loading

0 comments on commit daa3125

Please sign in to comment.