Improved of hanling of some Result types in hammond-data crate.

This commit is contained in:
Jordan Petridis 2017-10-21 04:58:15 +03:00
parent a6f03ff27c
commit 048d4800da
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
5 changed files with 67 additions and 46 deletions

View File

@ -13,7 +13,7 @@ CREATE TABLE `source` (
CREATE TABLE `episode` (
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
`title` TEXT,
`uri` TEXT UNIQUE,
`uri` TEXT NOT NULL UNIQUE,
`local_uri` TEXT,
`description` TEXT,
`published_date` TEXT ,

View File

@ -2,12 +2,11 @@ use rss::{Channel, Item};
use rfc822_sanitizer::parse_from_rfc2822_with_fallback;
use models;
use errors::*;
// TODO: look into how bad-utf8 is handled in rss crate,
// and figure if there is a need for checking before parsing.
// TODO: Extend the support for parsing itunes extensions
pub fn parse_podcast(chan: &Channel, source_id: i32) -> Result<models::NewPodcast> {
pub fn parse_podcast(chan: &Channel, source_id: i32) -> models::NewPodcast {
let title = chan.title().trim().to_owned();
let link = chan.link().to_owned();
let description = chan.description().trim().to_owned();
@ -20,26 +19,32 @@ pub fn parse_podcast(chan: &Channel, source_id: i32) -> Result<models::NewPodcas
chan.image().map(|foo| foo.url().to_owned())
};
let foo = models::NewPodcast {
models::NewPodcast {
title,
link,
description,
image_uri,
source_id,
};
Ok(foo)
}
}
pub fn parse_episode(item: &Item, parent_id: i32) -> Result<models::NewEpisode> {
pub fn parse_episode(item: &Item, parent_id: i32) -> models::NewEpisode {
let title = item.title().map(|s| s.trim());
let description = item.description().map(|s| s.trim());
let guid = item.guid().map(|x| x.value().trim());
let local_uri = None;
let mut uri = item.enclosure().map(|x| x.url().trim());
if uri == None {
uri = item.link();
}
// Its kinda weird this being an Option type.
// Rss 2.0 specified that it's optional.
// Though the db scema has a requirment of episode uri being Unique && Not Null.
// TODO: Restructure
let uri = if item.enclosure().map(|x| x.url().trim()).is_some() {
item.enclosure().map(|x| x.url().trim())
} else if item.link().is_some() {
item.link()
} else {
None
};
let date = parse_from_rfc2822_with_fallback(
// Default to rfc2822 represantation of epoch 0.
@ -53,7 +58,7 @@ pub fn parse_episode(item: &Item, parent_id: i32) -> Result<models::NewEpisode>
let length = item.enclosure().map(|x| x.length().parse().unwrap_or(0));
let foo = models::NewEpisode {
models::NewEpisode {
title,
uri,
local_uri,
@ -63,8 +68,7 @@ pub fn parse_episode(item: &Item, parent_id: i32) -> Result<models::NewEpisode>
epoch,
guid,
podcast_id: parent_id,
};
Ok(foo)
}
}
@ -86,7 +90,7 @@ mod tests {
the crucial issues of our time: national security, civil liberties, foreign \
policy, and criminal justice. Plus interviews with artists, thinkers, and \
newsmakers who challenge our preconceptions about the world we live in.";
let pd = parse_podcast(&channel, 0).unwrap();
let pd = parse_podcast(&channel, 0);
assert_eq!(pd.title, "Intercepted with Jeremy Scahill".to_string());
assert_eq!(pd.link, "https://theintercept.com/podcasts".to_string());
@ -110,7 +114,7 @@ mod tests {
let descr = "Latest Articles and Investigations from ProPublica, an independent, \
non-profit newsroom that produces investigative journalism in the public \
interest.";
let pd = parse_podcast(&channel, 0).unwrap();
let pd = parse_podcast(&channel, 0);
assert_eq!(pd.title, "The Breakthrough".to_string());
assert_eq!(pd.link, "http://www.propublica.org/podcast".to_string());
@ -129,7 +133,7 @@ mod tests {
let descr = "An open show powered by community LINUX Unplugged takes the best attributes \
of open collaboration and focuses them into a weekly lifestyle show about \
Linux.";
let pd = parse_podcast(&channel, 0).unwrap();
let pd = parse_podcast(&channel, 0);
assert_eq!(pd.title, "LINUX Unplugged Podcast".to_string());
assert_eq!(pd.link, "http://www.jupiterbroadcasting.com/".to_string());
@ -145,7 +149,7 @@ mod tests {
let file = File::open("tests/feeds/R4Explanation.xml").unwrap();
let channel = Channel::read_from(BufReader::new(file)).unwrap();
let pd = parse_podcast(&channel, 0).unwrap();
let pd = parse_podcast(&channel, 0);
let descr = "A weekly discussion of Rust RFCs";
assert_eq!(pd.title, "Request For Explanation".to_string());
@ -171,7 +175,7 @@ mod tests {
Commentator Shaun King explains his call for a boycott of the NFL and \
talks about his campaign to bring violent neo-Nazis to justice. Rapper \
Open Mike Eagle performs.";
let i = parse_episode(&firstitem, 0).unwrap();
let i = parse_episode(&firstitem, 0);
assert_eq!(i.title, Some("The Super Bowl of Racism"));
assert_eq!(i.uri, Some("http://traffic.megaphone.fm/PPY6458293736.mp3"));
@ -185,7 +189,7 @@ mod tests {
assert_eq!(i.epoch, 1505296800);
let second = channel.items().iter().nth(1).unwrap();
let i2 = parse_episode(&second, 0).unwrap();
let i2 = parse_episode(&second, 0);
let descr2 = "This week on Intercepted: Jeremy gives an update on the aftermath of \
Blackwaters 2007 massacre of Iraqi civilians. Intercept reporter Lee Fang \
@ -218,7 +222,7 @@ mod tests {
let firstitem = channel.items().first().unwrap();
let descr = "<p>A reporter finds that homes meant to replace New Yorks troubled \
psychiatric hospitals might be just as bad.</p>";
let i = parse_episode(&firstitem, 0).unwrap();
let i = parse_episode(&firstitem, 0);
assert_eq!(
i.title,
@ -244,7 +248,7 @@ mod tests {
assert_eq!(i.epoch, 1504872000);
let second = channel.items().iter().nth(1).unwrap();
let i2 = parse_episode(&second, 0).unwrap();
let i2 = parse_episode(&second, 0);
let descr2 = "<p>Jonathan Allen and Amie Parnes didnt know their \
book would be called Shattered, or that their extraordinary access would \
let them chronicle the mounting signs of a doomed campaign.</p>";
@ -285,7 +289,7 @@ mod tests {
decides to blow off a little steam by attacking his IoT devices, Wes \
has the scope on Equifax blaming open source & the Beard just saved \
the show. Its a really packed episode!";
let i = parse_episode(&firstitem, 0).unwrap();
let i = parse_episode(&firstitem, 0);
assert_eq!(i.title, Some("Hacking Devices with Kali Linux | LUP 214"));
assert_eq!(
@ -302,7 +306,7 @@ mod tests {
assert_eq!(i.epoch, 1505280282);
let second = channel.items().iter().nth(1).unwrap();
let i2 = parse_episode(&second, 0).unwrap();
let i2 = parse_episode(&second, 0);
let descr2 = "<p>The Gnome project is about to solve one of our audience's biggest \
Waylands concerns. But as the project takes on a new level of relevance, \
@ -335,7 +339,7 @@ mod tests {
let descr = "This week we look at <a \
href=\"https://github.com/rust-lang/rfcs/pull/2094\">RFC 2094</a> \
\"Non-lexical lifetimes\"";
let i = parse_episode(&firstitem, 0).unwrap();
let i = parse_episode(&firstitem, 0);
assert_eq!(i.title, Some("Episode #9 - A Once in a Lifetime RFC"));
assert_eq!(
@ -358,7 +362,7 @@ mod tests {
assert_eq!(i.epoch, 1503957600);
let second = channel.items().iter().nth(8).unwrap();
let i2 = parse_episode(&second, 0).unwrap();
let i2 = parse_episode(&second, 0);
let descr2 = "This week we look at <a \
href=\"https://github.com/rust-lang/rfcs/pull/2071\">RFC 2071</a> \"Add \

View File

@ -78,20 +78,24 @@ fn insert_return_episode(con: &SqliteConnection, ep: &NewEpisode) -> Result<Epis
Ok(dbqueries::load_episode(con, ep.uri.unwrap())?)
}
pub fn index_loop(db: Arc<Mutex<SqliteConnection>>, force: bool) -> Result<()> {
let mut f = fetch_feeds(db.clone(), force)?;
pub fn index_loop(db: &Arc<Mutex<SqliteConnection>>, force: bool) -> Result<()> {
let mut f = fetch_feeds(&db.clone(), force)?;
f.par_iter_mut().for_each(|&mut (ref mut req, ref source)| {
complete_index_from_source(req, source, db.clone()).unwrap();
let e = complete_index_from_source(req, source, &db.clone());
if e.is_err() {
error!("Error While trying to update the database.");
error!("Error msg: {}", e.unwrap_err());
};
});
info!("Indexing done.");
Ok(())
}
pub fn complete_index_from_source(
req: &mut reqwest::Response,
source: &Source,
mutex: Arc<Mutex<SqliteConnection>>,
mutex: &Arc<Mutex<SqliteConnection>>,
) -> Result<()> {
use std::io::Read;
use std::str::FromStr;
@ -106,59 +110,69 @@ pub fn complete_index_from_source(
}
fn complete_index(
mutex: Arc<Mutex<SqliteConnection>>,
connection: &Arc<Mutex<SqliteConnection>>,
chan: &rss::Channel,
parent: &Source,
) -> Result<()> {
let tempdb = mutex.lock().unwrap();
let tempdb = connection.lock().unwrap();
let pd = index_channel(&tempdb, chan, parent)?;
drop(tempdb);
index_channel_items(mutex.clone(), chan.items(), &pd)?;
index_channel_items(&connection.clone(), chan.items(), &pd)?;
Ok(())
}
fn index_channel(db: &SqliteConnection, chan: &rss::Channel, parent: &Source) -> Result<Podcast> {
let pd = feedparser::parse_podcast(chan, parent.id())?;
let pd = feedparser::parse_podcast(chan, parent.id());
// Convert NewPodcast to Podcast
let pd = insert_return_podcast(db, &pd)?;
Ok(pd)
}
// TODO: Propagate the erros from the maps up the chain.
fn index_channel_items(
mutex: Arc<Mutex<SqliteConnection>>,
mutex: &Arc<Mutex<SqliteConnection>>,
i: &[rss::Item],
pd: &Podcast,
) -> Result<()> {
let foo: Vec<_> = i.par_iter()
.map(|x| feedparser::parse_episode(x, pd.id()).unwrap())
.map(|x| feedparser::parse_episode(x, pd.id()))
.collect();
foo.par_iter().for_each(|x| {
let dbmutex = mutex.clone();
let db = dbmutex.lock().unwrap();
index_episode(&db, x).unwrap();
let e = index_episode(&db, x);
if let Err(err) = e {
error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err);
};
});
Ok(())
}
// Maybe this can be refactored into an Iterator for lazy evaluation.
pub fn fetch_feeds(
connection: Arc<Mutex<SqliteConnection>>,
connection: &Arc<Mutex<SqliteConnection>>,
force: bool,
) -> Result<Vec<(reqwest::Response, Source)>> {
let tempdb = connection.lock().unwrap();
let mut feeds = dbqueries::get_sources(&tempdb)?;
drop(tempdb);
let results: Vec<_> = feeds
let results: Vec<(reqwest::Response, Source)> = feeds
.par_iter_mut()
.map(|x| {
.filter_map(|x| {
let dbmutex = connection.clone();
let db = dbmutex.lock().unwrap();
refresh_source(&db, x, force).unwrap()
let l = refresh_source(&db, x, force);
if let Ok(res) = l {
Some(res)
} else {
error!("Error While trying to fetch from source: {}.", x.uri());
error!("Error msg: {}", l.unwrap_err());
None
}
})
.collect();
@ -260,10 +274,10 @@ mod tests {
index_source(&tempdb, &NewSource::new_with_uri(feed)).unwrap()
});
index_loop(db.clone(), true).unwrap();
index_loop(&db.clone(), true).unwrap();
// Run again to cover Unique constrains erros.
index_loop(db.clone(), true).unwrap();
index_loop(&db.clone(), true).unwrap();
}
#[test]
@ -304,7 +318,7 @@ mod tests {
let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap();
// Index the channel
complete_index(m.clone(), &chan, &s).unwrap();
complete_index(&m.clone(), &chan, &s).unwrap();
});
// Assert the index rows equal the controlled results

View File

@ -23,6 +23,7 @@ pub fn refresh_db(db: Arc<Mutex<SqliteConnection>>, stack: gtk::Stack) {
});
// FIXME: atm freezing the ui till update is done.
// Make it instead emmit a signal on update completion.
// TODO: emit a signal in order to update the podcast widget.
handle.join();
podcasts_view::update_podcasts_view(db.clone(), stack.clone());
@ -51,6 +52,7 @@ pub fn refresh_feed(db: Arc<Mutex<SqliteConnection>>, stack: gtk::Stack, source:
});
// FIXME: atm freezing the ui till update is done.
// Make it instead emmit a signal on update completion.
// TODO: emit a signal in order to update the podcast widget.
handle.join();
podcasts_view::update_podcasts_view(db.clone(), stack.clone());

View File

@ -80,6 +80,7 @@ fn epidose_widget(
error!("Error while trying to download: {}", ep_clone.uri());
error!("Error: {}", err);
};
// TODO: emit a signal in order to update the podcast widget.
});
});