Skip to content

Commit

Permalink
go: part2 of "smart" fetch, .Save() done, needs testing of new eps
Browse files Browse the repository at this point in the history
  • Loading branch information
azimut committed Apr 6, 2024
1 parent ea96f3e commit ca39570
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 161 deletions.
188 changes: 115 additions & 73 deletions backend/src/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func initTables(db *sql.DB) error {
feedid integer not null,
datemillis integer not null,
title text,
url text not null,
url text not null unique,
foreign key(feedid) references feeds(id)
) strict;
create index entriesindex on entries(feedid);
Expand All @@ -61,93 +61,35 @@ func initTables(db *sql.DB) error {
return nil
}

func insertFeedsAndEntries(db *sql.DB, feeds Feeds) error {
// insertSearch populates `search` table.
// Assumes there are already `entries_content` on the db.
func insertSearch(db *sql.DB, lastentryid int) error {
tx, err := db.Begin()
if err != nil {
return err
}
stmt_feeds, err := tx.Prepare("insert into feeds(id,title,url,description) values(?,?,?,?)")
if err != nil {
return err
}
defer stmt_feeds.Close()
stmt_feeds_meta, err := tx.Prepare("insert into feeds_metadata(feedid) values(?)")
stmt, err := tx.Prepare(`
insert into search
select entriesid,content
from entries_content
where entriedid > ?;
`)
if err != nil {
return err
}
defer stmt_feeds_meta.Close()
stmt_entry, err := tx.Prepare(
"insert into entries(feedid,datemillis,title,url) values(?,?,?,?)",
)
defer stmt.Close()
_, err = stmt.Exec(lastentryid)
if err != nil {
return err
}
defer stmt_entry.Close()
stmt_entry_content, err := tx.Prepare(
"insert into entries_content(entriesid,content) values(?,?)",
)
if err != nil {
return err
}
defer stmt_entry_content.Close()
for feedid, feed := range feeds {
_, err = stmt_feeds.Exec(feedid, feed.Title, feed.Url, feed.Description)
if err != nil {
return err
}
_, err = stmt_feeds_meta.Exec(feedid)
if err != nil {
return err
}
for _, entry := range feed.Entries {
// entries
res, err := stmt_entry.Exec(
feedid,
entry.Date.UnixMilli(),
entry.Title,
entry.Url,
)
if err != nil {
return err
}
// entries_content
entryid, err := res.LastInsertId()
if err != nil {
return err
}
_, err = stmt_entry_content.Exec(
entryid,
entry.Content,
)
if err != nil {
return err
}
}
}
err = tx.Commit()
if err != nil {
return err
}

err = insertSearch(db)
if err != nil {
return err
}

return nil
}

// insertSearch populates `search` table.
// Assumes there are already `entries_content` on the db.
func insertSearch(db *sql.DB) error {
sqlStmt := `
insert into search
select entriesid,content
from entries_content;
insert into search(search) values('optimize');
vacuum;
`
_, err := db.Exec(sqlStmt)
vacuum;`
_, err = db.Exec(sqlStmt)
if err != nil {
return err
}
Expand Down Expand Up @@ -212,5 +154,105 @@ func InitDB(dbname string) (db *sql.DB, err error) {
}

func (feeds Feeds) Save(db *sql.DB) error {
return insertFeedsAndEntries(db, feeds)
tx, err := db.Begin()
if err != nil {
return err
}
stmt_feeds, err := tx.Prepare("insert into feeds(title,url,description) values(?,?,?)")
if err != nil {
return err
}
defer stmt_feeds.Close()
stmt_feeds_meta_init, err := tx.Prepare("insert into feeds_metadata(feedid) values(?)")
if err != nil {
return err
}
defer stmt_feeds_meta_init.Close()
stmt_entry, err := tx.Prepare(
"insert into entries(feedid,datemillis,title,url) values(?,?,?,?)",
)
if err != nil {
return err
}
defer stmt_entry.Close()
stmt_entry_content, err := tx.Prepare(
"insert into entries_content(entriesid,content) values(?,?)",
)
if err != nil {
return err
}
defer stmt_entry_content.Close()
stmt_feeds_meta_update, err := tx.Prepare(`
UPDATE feeds_metadata
SET lastfetch = strftime('%s'), lastmodified = ?, etag = ?
WHERE feedid = ?
`)
if err != nil {
return err
}
defer stmt_feeds_meta_update.Close()

lastEntryId := 0
for _, feed := range feeds {
effectiveFeedId := feed.RawId
if feed.RawLastFetch.IsZero() { // first time seen
res, err := stmt_feeds.Exec(feed.Title, feed.Url, feed.Description)
if err != nil {
return err
}
tmp, err := res.LastInsertId()
if err != nil {
return err
}
effectiveFeedId = int(tmp)
_, err = stmt_feeds_meta_init.Exec(effectiveFeedId)
if err != nil {
return err
}
}

_, err = stmt_feeds_meta_update.Exec(feed.RawLastModified, feed.RawEtag, effectiveFeedId)
if err != nil {
return err
}

for _, entry := range feed.Entries {
// entries
res, err := stmt_entry.Exec(
effectiveFeedId,
entry.Date.UnixMilli(),
entry.Title,
entry.Url,
)
if err != nil {
continue // skip content add
}
// entries_content
lastEntryId, err := res.LastInsertId()
if err != nil {
return err
}
_, err = stmt_entry_content.Exec(
lastEntryId,
entry.Content,
)
if err != nil {
return err
}
}
}

err = tx.Commit()
if err != nil {
return err
}

if lastEntryId > 0 {
err = insertSearch(db, lastEntryId)
if err != nil {
return err
}
}

return nil
}
92 changes: 14 additions & 78 deletions backend/src/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"database/sql"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -31,89 +30,21 @@ type Feed struct {
Description string
}

// persistEtag updates etag value on feeds_metadata table
// assumes there is already an entry for feedid
func persistEtag(db *sql.DB, id int, etag string) error {
query := `
UPDATE feeds_metadata
SET etag = ?
WHERE feedid = ?
`
tx, err := db.Begin()
if err != nil {
return err
}

stmt_update, err := tx.Prepare(query)
if err != nil {
return err
}
defer stmt_update.Close()

_, err = stmt_update.Exec(etag, id)
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}
return nil
}

// persistEtag updates lastmodified value on feeds_metadata table
// assumes there is already an entry for feedid
func persistLastModified(db *sql.DB, id int, lastmodified string) error {
query := `
UPDATE feeds_metadata
SET lastmodified = ?
WHERE feedid = ?
`
tx, err := db.Begin()
if err != nil {
return err
}

stmt_update, err := tx.Prepare(query)
if err != nil {
return err
}
defer stmt_update.Close()

_, err = stmt_update.Exec(lastmodified, id)
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}
func (feed *Feed) FetchMetadata() (err error) {

return nil
}

func (feed *Feed) UpdateMetadata(db *sql.DB) error {
res, err := http.Head(feed.Url)
if err != nil {
return err
}

fmt.Printf("%+v\n", res.Header) // output for debug

etags, ok := res.Header["Etag"]
if ok && len(etags) > 0 {
fmt.Printf("found an etag (%s) for url (%s)\n", etags[0], feed.Url)
fmt.Println("old etag: ", feed.RawEtag)
if feed.RawEtag == etags[0] {
return fmt.Errorf("same etag (%s), skipping feed (%s)", feed.RawEtag, feed.Url)
} else {
err = persistEtag(db, feed.RawId, etags[0])
if err != nil {
return err
}
}
feed.RawEtag = etags[0]
}

lastmodified, ok := res.Header["Last-Modified"]
Expand All @@ -126,19 +57,21 @@ func (feed *Feed) UpdateMetadata(db *sql.DB) error {
feed.RawLastModified,
feed.Url,
)
} else {
err = persistLastModified(db, feed.RawId, lastmodified[0])
if err != nil {
return err
}
}
feed.RawLastModified = lastmodified[0]
}

return nil
return
}

func (feed *Feed) Fetch() error {

err := feed.FetchMetadata()
if err != nil {
fmt.Printf("dropping feed with error (%v)\n", err)
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

Expand All @@ -156,6 +89,10 @@ func (feed *Feed) Fetch() error {
html2md := md.NewConverter("", true, nil)

for _, item := range rawFeed.Items {
// Process only NEW entries, after last fetch (avoid INSERT attempts)
if item.PublishedParsed.Before(feed.RawLastFetch) {
continue
}
entry := Entry{
Date: *item.PublishedParsed,
Title: itemTitle(item.Title, *feed),
Expand Down Expand Up @@ -197,7 +134,6 @@ func (feed *Feed) Fetch() error {
}
feed.Entries = append(feed.Entries, entry)
}

return nil
}

Expand Down
14 changes: 4 additions & 10 deletions backend/src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,14 @@ func main() {
feeds := Feeds{}
for _, feed_json := range feeds_json {
fmt.Printf("processing json feed (%s)\n", feed_json.Url)
keep := true
for _, feed_db := range feeds_db {
if feed_db.Url == feed_json.Url {
if err = feed_db.UpdateMetadata(db); err != nil {
keep = false
fmt.Printf("dropping feed with error (%v)\n", err)
} else {
fmt.Printf("to be added feed (%s)\n", feed_db.Url)
}
feed_json.RawId = feed_db.RawId
feed_json.RawEtag = feed_db.RawEtag
feed_json.RawLastFetch = feed_db.RawLastFetch
feed_json.RawLastModified = feed_db.RawLastModified
}
}
if !keep {
continue
}
fmt.Printf("adding feed: %s\n", feed_json.Url)
feeds = append(feeds, feed_json)
}
Expand Down

0 comments on commit ca39570

Please sign in to comment.