hammond-data: Index_loop does not wait for GET request to finish now.

This commit is contained in:
Jordan Petridis 2017-12-26 18:18:48 +02:00
parent 41abeeb2e3
commit bfb74c4dba
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
4 changed files with 43 additions and 80 deletions

View File

@ -13,7 +13,7 @@ use rayon::prelude::*;
use test::Bencher; use test::Bencher;
use hammond_data::Source; use hammond_data::Source;
use hammond_data::feed::{index, Feed}; use hammond_data::feed::*;
use std::io::BufReader; use std::io::BufReader;
@ -43,7 +43,7 @@ fn index_urls() {
}) })
.collect(); .collect();
index(feeds); feeds.par_iter().for_each(|x| index(x));
} }
#[bench] #[bench]

View File

@ -99,58 +99,42 @@ impl Feed {
.collect(); .collect();
Ok(episodes) Ok(episodes)
// This would return every episode of the feed from the db.
// self.index_channel_items(&pd)?;
// Ok(dbqueries::get_pd_episodes(&pd)?)
} }
} }
/// Use's `fetch_all` to retrieve a list of `Feed`s and use index them using `feed::index`. /// Handle the indexing of a `Feed` into the Database.
pub fn index_all() -> Result<()> { pub fn index(feed: &Feed) {
let feeds = fetch_all()?; if let Err(err) = feed.index() {
index(feeds);
Ok(())
}
/// Handle the indexing of a feed `F` into the Database.
///
/// Consume a `ParallelIterator<Feed>` and index it.
pub fn index<F: IntoParallelIterator<Item = Feed>>(feeds: F) {
feeds.into_par_iter().for_each(|f| {
let e = f.index();
if e.is_err() {
error!("Error While trying to update the database."); error!("Error While trying to update the database.");
error!("Error msg: {}", e.unwrap_err()); error!("Error msg: {}", err);
}; };
});
info!("Indexing done.");
} }
/// Retrieve a list of all the `Source` in the database, /// Consume a `Source` and return a `Feed`.
/// then use `feed::fetch` to convert them into `Feed`s fn fetch(source: Source) -> Result<Feed> {
/// and return them. let uri = source.uri().to_owned();
pub fn fetch_all() -> Result<Vec<Feed>> { let feed = Feed::from_source(source);
let feeds = dbqueries::get_sources()?; if feed.is_err() {
Ok(fetch(feeds))
}
/// Consume a `ParallelIterator<Source>` and return a list of `Feed`s.
pub fn fetch<F: IntoParallelIterator<Item = Source>>(feeds: F) -> Vec<Feed> {
let results: Vec<_> = feeds
.into_par_iter()
.filter_map(|x| {
let uri = x.uri().to_owned();
let feed = Feed::from_source(x).ok();
if feed.is_none() {
error!("Error While trying to fetch from source url: {}.", uri); error!("Error While trying to fetch from source url: {}.", uri);
} }
feed feed
}) }
.collect();
results /// Index a "list" of `Source`s.
pub fn index_loop<S: IntoParallelIterator<Item = Source>>(sources: S) {
sources
.into_par_iter()
.filter_map(|x| fetch(x).ok())
.for_each(|x| index(&x));
info!("Indexing done.");
}
/// Retrieves all `Sources` from the database and updates/indexes them.
pub fn index_all() -> Result<()> {
let sources = dbqueries::get_sources()?;
index_loop(sources);
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@ -183,25 +167,6 @@ mod tests {
index_all().unwrap(); index_all().unwrap();
} }
#[test]
/// Insert feeds and update/index them.
fn test_fetch_loop() {
truncate_db().unwrap();
let inpt = vec![
"https://request-for-explanation.github.io/podcast/rss.xml",
"https://feeds.feedburner.com/InterceptedWithJeremyScahill",
"http://feeds.propublica.org/propublica/podcast",
"http://feeds.feedburner.com/linuxunplugged",
];
inpt.iter().for_each(|url| {
// Index the urls into the source table.
Source::from_url(url).unwrap();
});
fetch_all().unwrap();
}
#[test] #[test]
fn test_complete_index() { fn test_complete_index() {
// vec of (path, url) tuples. // vec of (path, url) tuples.
@ -240,7 +205,7 @@ mod tests {
.collect(); .collect();
// Index the channels // Index the channels
index(feeds); feeds.par_iter().for_each(|x| index(&x));
// Assert the index rows equal the controlled results // Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources().unwrap().len(), 4); assert_eq!(dbqueries::get_sources().unwrap().len(), 4);

View File

@ -212,7 +212,7 @@ mod tests {
// Convert Source it into a Feed and index it // Convert Source it into a Feed and index it
let feed = source.into_feed().unwrap(); let feed = source.into_feed().unwrap();
index(vec![feed]); index(&feed);
// Get the Podcast // Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into(); let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into();

View File

@ -34,20 +34,18 @@ pub fn refresh_feed(content: Rc<Content>, source: Option<Vec<Source>>) {
})); }));
thread::spawn(move || { thread::spawn(move || {
let feeds = { if let Some(s) = source {
if let Some(vec) = source { feed::index_loop(s);
Ok(feed::fetch(vec))
} else { } else {
feed::fetch_all() let e = feed::index_all();
} if let Err(err) = e {
error!("Error While trying to update the database.");
error!("Error msg: {}", err);
};
}; };
if let Ok(x) = feeds {
feed::index(x);
sender.send(true).expect("Couldn't send data to channel");; sender.send(true).expect("Couldn't send data to channel");;
glib::idle_add(refresh_podcasts_view); glib::idle_add(refresh_podcasts_view);
};
}); });
} }
@ -113,7 +111,7 @@ mod tests {
// Convert Source it into a Feed and index it // Convert Source it into a Feed and index it
let feed = source.into_feed().unwrap(); let feed = source.into_feed().unwrap();
index(vec![feed]); index(&feed);
// Get the Podcast // Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap(); let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();