Initial draft of hammond-data/src/index_feed.rs API redesign.
This commit is contained in:
parent
3c6176b1c0
commit
54a0f17588
@ -15,7 +15,7 @@ use rand::Rng;
|
|||||||
use test::Bencher;
|
use test::Bencher;
|
||||||
|
|
||||||
use hammond_data::run_migration_on;
|
use hammond_data::run_migration_on;
|
||||||
use hammond_data::index_feed::{complete_index, insert_return_source, Database};
|
use hammond_data::index_feed::{index_feeds, insert_return_source, Database, Feed};
|
||||||
|
|
||||||
// use std::io::BufRead;
|
// use std::io::BufRead;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -63,9 +63,10 @@ fn index_urls(m: &Database) {
|
|||||||
};
|
};
|
||||||
// parse it into a channel
|
// parse it into a channel
|
||||||
let chan = rss::Channel::read_from(buff).unwrap();
|
let chan = rss::Channel::read_from(buff).unwrap();
|
||||||
|
let feed = Feed(chan, s);
|
||||||
|
|
||||||
// Index the channel
|
// Index the channel
|
||||||
complete_index(m, &chan, &s).unwrap();
|
index_feeds(m, &mut [feed]);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel;
|
use diesel;
|
||||||
use rss;
|
use rss;
|
||||||
use reqwest;
|
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
|
||||||
use dbqueries;
|
use dbqueries;
|
||||||
@ -11,12 +10,51 @@ use feedparser;
|
|||||||
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Feed(pub reqwest::Response, pub Source);
|
|
||||||
|
|
||||||
pub type Database = Arc<Mutex<SqliteConnection>>;
|
pub type Database = Arc<Mutex<SqliteConnection>>;
|
||||||
|
|
||||||
fn index_source(con: &SqliteConnection, foo: &NewSource) {
|
#[derive(Debug)]
|
||||||
|
pub struct Feed(pub rss::Channel, pub Source);
|
||||||
|
|
||||||
|
impl Feed {
|
||||||
|
fn index(&self, db: &Database) -> Result<()> {
|
||||||
|
let tempdb = db.lock().unwrap();
|
||||||
|
let pd = self.index_channel(&tempdb)?;
|
||||||
|
drop(tempdb);
|
||||||
|
|
||||||
|
self.index_channel_items(db, &pd)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn index_channel(&self, con: &SqliteConnection) -> Result<Podcast> {
|
||||||
|
let pd = feedparser::parse_podcast(&self.0, self.1.id());
|
||||||
|
// Convert NewPodcast to Podcast
|
||||||
|
insert_return_podcast(con, &pd)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> {
|
||||||
|
let it = self.0.items();
|
||||||
|
let episodes: Vec<_> = it.par_iter()
|
||||||
|
.map(|x| feedparser::parse_episode(x, pd.id()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let conn = db.lock().unwrap();
|
||||||
|
let e = conn.transaction::<(), Error, _>(|| {
|
||||||
|
episodes.iter().for_each(|x| {
|
||||||
|
let e = index_episode(&conn, x);
|
||||||
|
if let Err(err) = e {
|
||||||
|
error!("Failed to index episode: {:?}.", x);
|
||||||
|
error!("Error msg: {}", err);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
drop(conn);
|
||||||
|
|
||||||
|
e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_source(con: &SqliteConnection, foo: &NewSource) {
|
||||||
use schema::source::dsl::*;
|
use schema::source::dsl::*;
|
||||||
|
|
||||||
// Throw away the result like `insert or ignore`
|
// Throw away the result like `insert or ignore`
|
||||||
@ -78,96 +116,41 @@ fn insert_return_podcast(con: &SqliteConnection, pd: &NewPodcast) -> Result<Podc
|
|||||||
pub fn full_index_loop(db: &Database) -> Result<()> {
|
pub fn full_index_loop(db: &Database) -> Result<()> {
|
||||||
let mut f = fetch_all_feeds(db)?;
|
let mut f = fetch_all_feeds(db)?;
|
||||||
|
|
||||||
index_feed(db, &mut f);
|
index_feeds(db, &mut f);
|
||||||
info!("Indexing done.");
|
info!("Indexing done.");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn index_feed(db: &Database, f: &mut [Feed]) {
|
pub fn index_feeds(db: &Database, f: &mut [Feed]) {
|
||||||
f.par_iter_mut()
|
f.into_par_iter().for_each(|x| {
|
||||||
.for_each(|&mut Feed(ref mut req, ref source)| {
|
let e = x.index(db);
|
||||||
let e = complete_index_from_source(req, source, db);
|
if e.is_err() {
|
||||||
if e.is_err() {
|
error!("Error While trying to update the database.");
|
||||||
error!("Error While trying to update the database.");
|
error!("Error msg: {}", e.unwrap_err());
|
||||||
error!("Error msg: {}", e.unwrap_err());
|
};
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn complete_index_from_source(
|
|
||||||
req: &mut reqwest::Response,
|
|
||||||
source: &Source,
|
|
||||||
db: &Database,
|
|
||||||
) -> Result<()> {
|
|
||||||
use std::io::Read;
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
let mut buf = String::new();
|
|
||||||
req.read_to_string(&mut buf)?;
|
|
||||||
let chan = rss::Channel::from_str(&buf)?;
|
|
||||||
|
|
||||||
complete_index(db, &chan, source)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn complete_index(db: &Database, chan: &rss::Channel, parent: &Source) -> Result<()> {
|
|
||||||
let pd = {
|
|
||||||
let conn = db.lock().unwrap();
|
|
||||||
index_channel(&conn, chan, parent)?
|
|
||||||
};
|
|
||||||
index_channel_items(db, chan.items(), &pd);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn index_channel(con: &SqliteConnection, chan: &rss::Channel, parent: &Source) -> Result<Podcast> {
|
|
||||||
let pd = feedparser::parse_podcast(chan, parent.id());
|
|
||||||
// Convert NewPodcast to Podcast
|
|
||||||
insert_return_podcast(con, &pd)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn index_channel_items(db: &Database, it: &[rss::Item], pd: &Podcast) {
|
|
||||||
let episodes: Vec<_> = it.par_iter()
|
|
||||||
.map(|x| feedparser::parse_episode(x, pd.id()))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let conn = db.lock().unwrap();
|
|
||||||
let e = conn.transaction::<(), Error, _>(|| {
|
|
||||||
episodes.iter().for_each(|x| {
|
|
||||||
let e = index_episode(&conn, x);
|
|
||||||
if let Err(err) = e {
|
|
||||||
error!("Failed to index episode: {:?}.", x);
|
|
||||||
error!("Error msg: {}", err);
|
|
||||||
};
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
});
|
});
|
||||||
drop(conn);
|
|
||||||
|
|
||||||
if let Err(err) = e {
|
|
||||||
error!("Episodes Transcaction Failed.");
|
|
||||||
error!("Error msg: {}", err);
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maybe this can be refactored into an Iterator for lazy evaluation.
|
|
||||||
pub fn fetch_all_feeds(db: &Database) -> Result<Vec<Feed>> {
|
pub fn fetch_all_feeds(db: &Database) -> Result<Vec<Feed>> {
|
||||||
let mut feeds = {
|
let feeds = {
|
||||||
let conn = db.lock().unwrap();
|
let conn = db.lock().unwrap();
|
||||||
dbqueries::get_sources(&conn)?
|
dbqueries::get_sources(&conn)?
|
||||||
};
|
};
|
||||||
|
|
||||||
let results = fetch_feeds(db, &mut feeds);
|
let results = fetch_feeds(db, feeds);
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_feeds(db: &Database, feeds: &mut [Source]) -> Vec<Feed> {
|
pub fn fetch_feeds(db: &Database, feeds: Vec<Source>) -> Vec<Feed> {
|
||||||
let results: Vec<Feed> = feeds
|
let results: Vec<_> = feeds
|
||||||
.par_iter_mut()
|
.into_par_iter()
|
||||||
.filter_map(|x| {
|
.filter_map(|x| {
|
||||||
let l = refresh_source(db, x);
|
let uri = x.uri().to_owned();
|
||||||
|
let l = x.refresh(db);
|
||||||
if l.is_ok() {
|
if l.is_ok() {
|
||||||
l.ok()
|
l.ok()
|
||||||
} else {
|
} else {
|
||||||
error!("Error While trying to fetch from source: {}.", x.uri());
|
error!("Error While trying to fetch from source: {}.", uri);
|
||||||
error!("Error msg: {}", l.unwrap_err());
|
error!("Error msg: {}", l.unwrap_err());
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -177,43 +160,6 @@ pub fn fetch_feeds(db: &Database, feeds: &mut [Source]) -> Vec<Feed> {
|
|||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn refresh_source(db: &Database, feed: &mut Source) -> Result<Feed> {
|
|
||||||
use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified};
|
|
||||||
|
|
||||||
let mut headers = Headers::new();
|
|
||||||
|
|
||||||
if let Some(foo) = feed.http_etag() {
|
|
||||||
headers.set(ETag(EntityTag::new(true, foo.to_owned())));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(foo) = feed.last_modified() {
|
|
||||||
if let Ok(x) = foo.parse::<HttpDate>() {
|
|
||||||
headers.set(LastModified(x));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: I have fucked up somewhere here.
|
|
||||||
// Getting back 200 codes even though I supposedly sent etags.
|
|
||||||
// info!("Headers: {:?}", headers);
|
|
||||||
let client = reqwest::Client::builder().referer(false).build()?;
|
|
||||||
let req = client.get(feed.uri()).headers(headers).send()?;
|
|
||||||
|
|
||||||
info!("GET to {} , returned: {}", feed.uri(), req.status());
|
|
||||||
|
|
||||||
// TODO match on more stuff
|
|
||||||
// 301: Permanent redirect of the url
|
|
||||||
// 302: Temporary redirect of the url
|
|
||||||
// 304: Up to date Feed, checked with the Etag
|
|
||||||
// 410: Feed deleted
|
|
||||||
// match req.status() {
|
|
||||||
// reqwest::StatusCode::NotModified => (),
|
|
||||||
// _ => (),
|
|
||||||
// };
|
|
||||||
|
|
||||||
feed.update_etag(db, &req)?;
|
|
||||||
Ok(Feed(req, feed.clone()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
@ -308,9 +254,10 @@ mod tests {
|
|||||||
let feed = fs::File::open(path).unwrap();
|
let feed = fs::File::open(path).unwrap();
|
||||||
// parse it into a channel
|
// parse it into a channel
|
||||||
let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap();
|
let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap();
|
||||||
|
let feed = Feed(chan, s);
|
||||||
|
|
||||||
// Index the channel
|
// Index the channel
|
||||||
complete_index(&m, &chan, &s).unwrap();
|
index_feeds(&m, &mut [feed]);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Assert the index rows equal the controlled results
|
// Assert the index rows equal the controlled results
|
||||||
|
|||||||
@ -1,12 +1,19 @@
|
|||||||
|
use diesel::prelude::*;
|
||||||
|
|
||||||
use schema::{episode, podcast, source};
|
use schema::{episode, podcast, source};
|
||||||
|
|
||||||
|
use models::Source;
|
||||||
|
use index_feed::Database;
|
||||||
|
use index_feed;
|
||||||
|
use dbqueries;
|
||||||
|
|
||||||
#[derive(Insertable)]
|
#[derive(Insertable)]
|
||||||
#[table_name = "source"]
|
#[table_name = "source"]
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct NewSource<'a> {
|
pub struct NewSource<'a> {
|
||||||
pub uri: &'a str,
|
pub uri: &'a str,
|
||||||
pub last_modified: Option<&'a str>,
|
last_modified: Option<&'a str>,
|
||||||
pub http_etag: Option<&'a str>,
|
http_etag: Option<&'a str>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> NewSource<'a> {
|
impl<'a> NewSource<'a> {
|
||||||
@ -17,6 +24,12 @@ impl<'a> NewSource<'a> {
|
|||||||
http_etag: None,
|
http_etag: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_source(self, db: &Database) -> QueryResult<Source> {
|
||||||
|
let tempdb = db.lock().unwrap();
|
||||||
|
index_feed::index_source(&tempdb, &self);
|
||||||
|
dbqueries::get_source_from_uri(&tempdb, self.uri)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Insertable)]
|
#[derive(Insertable)]
|
||||||
|
|||||||
@ -3,13 +3,17 @@ use reqwest;
|
|||||||
use diesel::SaveChangesDsl;
|
use diesel::SaveChangesDsl;
|
||||||
use diesel::result::QueryResult;
|
use diesel::result::QueryResult;
|
||||||
use reqwest::header::{ETag, LastModified};
|
use reqwest::header::{ETag, LastModified};
|
||||||
|
use rss::Channel;
|
||||||
|
|
||||||
use schema::{episode, podcast, source};
|
use schema::{episode, podcast, source};
|
||||||
use index_feed::Database;
|
use index_feed::{Database, Feed};
|
||||||
use errors::*;
|
use errors::*;
|
||||||
|
|
||||||
use models::insertables::NewPodcast;
|
use models::insertables::NewPodcast;
|
||||||
|
|
||||||
|
use std::io::Read;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
#[derive(Queryable, Identifiable, AsChangeset, Associations)]
|
#[derive(Queryable, Identifiable, AsChangeset, Associations)]
|
||||||
#[table_name = "episode"]
|
#[table_name = "episode"]
|
||||||
#[changeset_options(treat_none_as_null = "true")]
|
#[changeset_options(treat_none_as_null = "true")]
|
||||||
@ -272,7 +276,7 @@ impl<'a> Source {
|
|||||||
|
|
||||||
/// Extract Etag and LastModifier from req, and update self and the
|
/// Extract Etag and LastModifier from req, and update self and the
|
||||||
/// corresponding db row.
|
/// corresponding db row.
|
||||||
pub fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> {
|
fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> {
|
||||||
let headers = req.headers();
|
let headers = req.headers();
|
||||||
|
|
||||||
// let etag = headers.get_raw("ETag").unwrap();
|
// let etag = headers.get_raw("ETag").unwrap();
|
||||||
@ -295,4 +299,46 @@ impl<'a> Source {
|
|||||||
let tempdb = db.lock().unwrap();
|
let tempdb = db.lock().unwrap();
|
||||||
self.save_changes::<Source>(&*tempdb)
|
self.save_changes::<Source>(&*tempdb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn refresh(mut self, db: &Database) -> Result<Feed> {
|
||||||
|
use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified};
|
||||||
|
|
||||||
|
let mut headers = Headers::new();
|
||||||
|
|
||||||
|
if let Some(foo) = self.http_etag() {
|
||||||
|
headers.set(ETag(EntityTag::new(true, foo.to_owned())));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(foo) = self.last_modified() {
|
||||||
|
if let Ok(x) = foo.parse::<HttpDate>() {
|
||||||
|
headers.set(LastModified(x));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: I have fucked up somewhere here.
|
||||||
|
// Getting back 200 codes even though I supposedly sent etags.
|
||||||
|
// info!("Headers: {:?}", headers);
|
||||||
|
let client = reqwest::Client::builder().referer(false).build()?;
|
||||||
|
let mut req = client.get(self.uri()).headers(headers).send()?;
|
||||||
|
|
||||||
|
info!("GET to {} , returned: {}", self.uri(), req.status());
|
||||||
|
|
||||||
|
// TODO match on more stuff
|
||||||
|
// 301: Permanent redirect of the url
|
||||||
|
// 302: Temporary redirect of the url
|
||||||
|
// 304: Up to date Feed, checked with the Etag
|
||||||
|
// 410: Feed deleted
|
||||||
|
// match req.status() {
|
||||||
|
// reqwest::StatusCode::NotModified => (),
|
||||||
|
// _ => (),
|
||||||
|
// };
|
||||||
|
|
||||||
|
self.update_etag(db, &req)?;
|
||||||
|
|
||||||
|
let mut buf = String::new();
|
||||||
|
req.read_to_string(&mut buf)?;
|
||||||
|
let chan = Channel::from_str(&buf)?;
|
||||||
|
|
||||||
|
Ok(Feed(chan, self))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -42,14 +42,14 @@ pub fn refresh_feed(
|
|||||||
|
|
||||||
let feeds = {
|
let feeds = {
|
||||||
if let Some(mut vec) = source {
|
if let Some(mut vec) = source {
|
||||||
Ok(index_feed::fetch_feeds(&db, &mut vec))
|
Ok(index_feed::fetch_feeds(&db, vec))
|
||||||
} else {
|
} else {
|
||||||
index_feed::fetch_all_feeds(&db)
|
index_feed::fetch_all_feeds(&db)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Ok(mut x) = feeds {
|
if let Ok(mut x) = feeds {
|
||||||
index_feed::index_feed(&db, &mut x);
|
index_feed::index_feeds(&db, &mut x);
|
||||||
info!("Indexing done.");
|
info!("Indexing done.");
|
||||||
|
|
||||||
sender.send(true).expect("Couldn't send data to channel");;
|
sender.send(true).expect("Couldn't send data to channel");;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user