Initial refactor of the index/update loop.
This commit is contained in:
parent
b2ac9685ff
commit
d76d367a9c
@ -344,9 +344,7 @@ mod tests {
|
||||
assert_eq!(i.length, Some(15077388));
|
||||
assert_eq!(
|
||||
i.guid,
|
||||
Some(
|
||||
"https://request-for-explanation.github.io/podcast/ep9-a-once-in-a-lifetime-rfc/",
|
||||
)
|
||||
Some("https://request-for-explanation.github.io/podcast/ep9-a-once-in-a-lifetime-rfc/",)
|
||||
);
|
||||
assert_eq!(i.published_date, Some("Mon, 28 Aug 2017 15:00:00 PDT"));
|
||||
// Need to fix datetime parser first
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use diesel::prelude::*;
|
||||
use diesel;
|
||||
use rss;
|
||||
use reqwest;
|
||||
|
||||
use schema;
|
||||
use dbqueries;
|
||||
@ -27,20 +28,14 @@ pub fn foo() {
|
||||
}
|
||||
}
|
||||
|
||||
index_loop(db).unwrap();
|
||||
index_loop(&db).unwrap();
|
||||
}
|
||||
|
||||
fn insert_source(con: &SqliteConnection, url: &str) -> Result<Source> {
|
||||
let foo = NewSource::new_with_uri(url);
|
||||
|
||||
match dbqueries::load_source(con, foo.uri) {
|
||||
Ok(mut bar) => {
|
||||
// TODO: Cmp first before replacing
|
||||
// FIXME: NewSource has None values for etag, and last_mod atm
|
||||
// bar.set_http_etag(foo.http_etag);
|
||||
// bar.set_last_modified(foo.last_modified);
|
||||
// bar.save_changes::<Source>(con)?;
|
||||
}
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
diesel::insert(&foo)
|
||||
.into(schema::source::table)
|
||||
@ -98,23 +93,22 @@ fn index_episode(con: &SqliteConnection, item: &rss::Item, parent: &Podcast) ->
|
||||
Ok(dbqueries::load_episode(con, &ep.uri.unwrap())?)
|
||||
}
|
||||
|
||||
pub fn index_loop(db: SqliteConnection) -> Result<()> {
|
||||
// let db = ::establish_connection();
|
||||
pub fn index_loop(db: &SqliteConnection) -> Result<()> {
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
|
||||
let f = dbqueries::get_sources(&db);
|
||||
let mut f = fetch_feeds(db)?;
|
||||
|
||||
for feed in f.unwrap().iter_mut() {
|
||||
// info!("{:?}", feed.id());
|
||||
for &mut (ref mut req, ref source) in f.iter_mut() {
|
||||
let mut buf = String::new();
|
||||
req.read_to_string(&mut buf)?;
|
||||
let chan = rss::Channel::from_str(&buf)?;
|
||||
|
||||
// This method will defently get split and nuked
|
||||
// but for now its poc
|
||||
let chan = feed.get_podcast_chan(&db)?;
|
||||
|
||||
let pd = index_podcast(&db, &chan, &feed)?;
|
||||
let pd = index_podcast(db, &chan, source)?;
|
||||
|
||||
let _: Vec<_> = chan.items()
|
||||
.iter()
|
||||
.map(|x| index_episode(&db, &x, &pd))
|
||||
.map(|x| index_episode(db, &x, &pd))
|
||||
.collect();
|
||||
|
||||
info!("{:#?}", pd);
|
||||
@ -123,3 +117,24 @@ pub fn index_loop(db: SqliteConnection) -> Result<()> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: make it into an iterator that yields reqwest::response
|
||||
pub fn fetch_feeds(connection: &SqliteConnection) -> Result<Vec<(reqwest::Response, Source)>> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
let mut feeds = dbqueries::get_sources(connection)?;
|
||||
|
||||
for feed in feeds.iter_mut() {
|
||||
// TODO sent etag headers
|
||||
let req = reqwest::get(feed.uri())?;
|
||||
|
||||
// TODO match on status()
|
||||
if req.status() == reqwest::StatusCode::NotModified {
|
||||
continue;
|
||||
}
|
||||
feed.update_etag(connection, &req)?;
|
||||
results.push((req, feed.clone()));
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
@ -176,26 +176,9 @@ impl<'a> Source {
|
||||
self.http_etag = value.map(|x| x.to_string());
|
||||
}
|
||||
|
||||
/// Fetch the xml feed from the source url, update the etag headers,
|
||||
/// parse the feed into an rss:Channel and return it.
|
||||
pub fn get_podcast_chan(&mut self, con: &SqliteConnection) -> Result<Channel> {
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
|
||||
let mut req = reqwest::get(&self.uri)?;
|
||||
self.update_etag(con, &req)?;
|
||||
|
||||
let mut buf = String::new();
|
||||
// info!("{}", buf);
|
||||
req.read_to_string(&mut buf)?;
|
||||
let chan = Channel::from_str(&buf)?;
|
||||
|
||||
Ok(chan)
|
||||
}
|
||||
|
||||
/// Extract Etag and LastModifier from req, and update self and the
|
||||
/// corresponding db row.
|
||||
fn update_etag(&mut self, con: &SqliteConnection, req: &reqwest::Response) -> Result<()> {
|
||||
pub fn update_etag(&mut self, con: &SqliteConnection, req: &reqwest::Response) -> Result<()> {
|
||||
let headers = req.headers();
|
||||
debug!("{:#?}", headers);
|
||||
|
||||
@ -203,12 +186,16 @@ impl<'a> Source {
|
||||
let etag = headers.get::<ETag>();
|
||||
let lmod = headers.get::<LastModified>();
|
||||
|
||||
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
|
||||
self.last_modified = lmod.map(|x| format!("{}", x));
|
||||
info!("Self etag: {:?}", self.http_etag);
|
||||
info!("Self last_mod: {:?}", self.last_modified);
|
||||
if self.http_etag() != etag.map(|x| x.tag())
|
||||
|| self.last_modified != lmod.map(|x| format!("{}", x))
|
||||
{
|
||||
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
|
||||
self.last_modified = lmod.map(|x| format!("{}", x));
|
||||
info!("Self etag: {:?}", self.http_etag);
|
||||
info!("Self last_mod: {:?}", self.last_modified);
|
||||
self.save_changes::<Source>(con)?;
|
||||
}
|
||||
|
||||
self.save_changes::<Source>(con)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user