hammond-data: Remove leftover synchronous code.

This commit is contained in:
Jordan Petridis 2018-01-19 15:41:37 +02:00
parent b92ba7be76
commit 46b6c0d27e
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
9 changed files with 49 additions and 131 deletions

View File

@ -44,7 +44,7 @@ static URLS: &[(&[u8], &str)] = &[
(LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"), (LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"),
]; ];
fn index_urls() { fn index_urls() -> Vec<Box<Future<Item = (), Error = Error>>> {
let feeds: Vec<_> = URLS.iter() let feeds: Vec<_> = URLS.iter()
.map(|&(buff, url)| { .map(|&(buff, url)| {
// Create and insert a Source into db // Create and insert a Source into db
@ -55,106 +55,96 @@ fn index_urls() {
}) })
.collect(); .collect();
feeds.iter().for_each(|x| x.index().unwrap()); feeds.into_iter().map(|feed| feed.index()).collect()
}
fn index_urls_async() -> Vec<Box<Future<Item = (), Error = Error>>> {
let feeds: Vec<_> = URLS.iter()
.map(|&(buff, url)| {
// Create and insert a Source into db
let s = Source::from_url(url).unwrap();
// parse it into a channel
let chan = rss::Channel::read_from(BufReader::new(buff)).unwrap();
Feed::from_channel_source(chan, s.id())
})
.collect();
feeds.into_iter().map(|feed| feed.index_async()).collect()
} }
fn bench_index_feeds(c: &mut Criterion) { fn bench_index_feeds(c: &mut Criterion) {
truncate_db().unwrap(); truncate_db().unwrap();
c.bench_function("index_feeds_sync", |b| b.iter(|| index_urls()));
}
fn bench_index_feeds_async(c: &mut Criterion) {
truncate_db().unwrap();
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
c.bench_function("index_feeds_sync", |b| { c.bench_function("index_feeds", |b| {
b.iter(|| { b.iter(|| {
let list = index_urls_async(); let list = index_urls();
let _foo = core.run(select_all(list)); let _foo = core.run(join_all(list));
}) })
}); });
truncate_db().unwrap();
} }
fn bench_index_unchanged_feeds(c: &mut Criterion) { fn bench_index_unchanged_feeds(c: &mut Criterion) {
truncate_db().unwrap(); truncate_db().unwrap();
let mut core = Core::new().unwrap();
// Index first so it will only bench the comparison test case. // Index first so it will only bench the comparison test case.
index_urls(); let list = index_urls();
let _foo = core.run(join_all(list));
c.bench_function("index_10_unchanged_sync", |b| { c.bench_function("index_5_unchanged", |b| {
b.iter(|| { b.iter(|| {
for _ in 0..10 { for _ in 0..5 {
index_urls(); let list = index_urls();
let _foo = core.run(join_all(list));
} }
}) })
}); });
truncate_db().unwrap();
} }
fn bench_get_future_feeds(c: &mut Criterion) { // This is broken and I don't know why.
fn bench_pipeline(c: &mut Criterion) {
truncate_db().unwrap(); truncate_db().unwrap();
URLS.iter().for_each(|&(_, url)| { URLS.iter().for_each(|&(_, url)| {
Source::from_url(url).unwrap(); Source::from_url(url).unwrap();
}); });
c.bench_function("index_urls_futures", |b| { c.bench_function("pipline", |b| {
b.iter(|| { b.iter(|| {
let sources = hammond_data::dbqueries::get_sources().unwrap(); let sources = hammond_data::dbqueries::get_sources().unwrap();
hammond_data::pipeline::pipeline(sources, false).unwrap(); hammond_data::pipeline::pipeline(sources, true).unwrap();
}) })
}); });
truncate_db().unwrap();
} }
fn bench_index_greater_than_code(c: &mut Criterion) { fn bench_index_large_feed(c: &mut Criterion) {
truncate_db().unwrap(); truncate_db().unwrap();
let url = "https://www.greaterthancode.com/feed/podcast"; let url = "https://www.greaterthancode.com/feed/podcast";
let mut core = Core::new().unwrap();
c.bench_function("index_greater_than_code_sync", |b| { c.bench_function("index_large_feed", |b| {
b.iter(|| { b.iter(|| {
let s = Source::from_url(url).unwrap(); let s = Source::from_url(url).unwrap();
// parse it into a channel // parse it into a channel
let chan = rss::Channel::read_from(BufReader::new(CODE)).unwrap(); let chan = rss::Channel::read_from(BufReader::new(CODE)).unwrap();
let feed = Feed::from_channel_source(chan, s.id()); let feed = Feed::from_channel_source(chan, s.id());
feed.index().unwrap(); let _foo = core.run(feed.index()).unwrap();
}) })
}); });
truncate_db().unwrap();
} }
fn bench_index_steal_the_stars(c: &mut Criterion) { fn bench_index_small_feed(c: &mut Criterion) {
truncate_db().unwrap(); truncate_db().unwrap();
let url = "https://rss.art19.com/steal-the-stars"; let url = "https://rss.art19.com/steal-the-stars";
let mut core = Core::new().unwrap();
c.bench_function("index_steal_the_stars_sync", |b| { c.bench_function("index_small_feed", |b| {
b.iter(|| { b.iter(|| {
let s = Source::from_url(url).unwrap(); let s = Source::from_url(url).unwrap();
// parse it into a channel // parse it into a channel
let chan = rss::Channel::read_from(BufReader::new(STARS)).unwrap(); let chan = rss::Channel::read_from(BufReader::new(STARS)).unwrap();
let feed = Feed::from_channel_source(chan, s.id()); let feed = Feed::from_channel_source(chan, s.id());
feed.index().unwrap(); let _foo = core.run(feed.index()).unwrap();
}) })
}); });
truncate_db().unwrap();
} }
criterion_group!( criterion_group!(
benches, benches,
bench_index_feeds, bench_index_feeds,
bench_index_feeds_async,
bench_index_unchanged_feeds, bench_index_unchanged_feeds,
bench_get_future_feeds, bench_pipeline,
bench_index_greater_than_code, bench_index_large_feed,
bench_index_steal_the_stars bench_index_small_feed
); );
criterion_main!(benches); criterion_main!(benches);

View File

@ -2,7 +2,6 @@
use futures::future::*; use futures::future::*;
use itertools::{Either, Itertools}; use itertools::{Either, Itertools};
use rayon::prelude::*;
use rss; use rss;
use dbqueries; use dbqueries;
@ -28,16 +27,10 @@ impl Feed {
} }
/// Index the contents of the RSS `Feed` into the database. /// Index the contents of the RSS `Feed` into the database.
pub fn index(&self) -> Result<()> { pub fn index(self) -> Box<Future<Item = (), Error = Error>> {
let pd = self.parse_podcast().into_podcast()?;
self.index_channel_items(&pd)
}
/// Index the contents of the RSS `Feed` into the database.
pub fn index_async(self) -> Box<Future<Item = (), Error = Error>> {
let fut = self.parse_podcast_async() let fut = self.parse_podcast_async()
.and_then(|pd| pd.into_podcast()) .and_then(|pd| pd.into_podcast())
.and_then(move |pd| self.index_channel_items_async(&pd)); .and_then(move |pd| self.index_channel_items(&pd));
Box::new(fut) Box::new(fut)
} }
@ -50,35 +43,7 @@ impl Feed {
Box::new(ok(self.parse_podcast())) Box::new(ok(self.parse_podcast()))
} }
fn index_channel_items(&self, pd: &Podcast) -> Result<()> { fn index_channel_items(&self, pd: &Podcast) -> Box<Future<Item = (), Error = Error>> {
let items = self.channel.items();
let (insert, update): (Vec<_>, Vec<_>) = items
.into_iter()
.filter_map(|item| glue(item, pd.id()).ok())
.filter(|state| match *state {
IndexState::NotChanged => false,
_ => true,
})
.partition_map(|state| match state {
IndexState::Index(e) => Either::Left(e),
IndexState::Update(e) => Either::Right(e),
// How not to use the unimplemented macro...
IndexState::NotChanged => unimplemented!(),
});
dbqueries::index_new_episodes(insert.as_slice())?;
update.par_iter().for_each(|&(ref ep, rowid)| {
if let Err(err) = ep.update(rowid) {
error!("Failed to index episode: {:?}.", ep.title());
error!("Error msg: {}", err);
};
});
Ok(())
}
fn index_channel_items_async(&self, pd: &Podcast) -> Box<Future<Item = (), Error = Error>> {
let fut = self.get_stuff(pd) let fut = self.get_stuff(pd)
.and_then(|(insert, update)| { .and_then(|(insert, update)| {
if !insert.is_empty() { if !insert.is_empty() {
@ -127,10 +92,13 @@ impl Feed {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use tokio_core::reactor::Core;
use Source; use Source;
use database::truncate_db; use database::truncate_db;
use dbqueries; use dbqueries;
use pipeline; use pipeline;
use std::fs; use std::fs;
use std::io::BufReader; use std::io::BufReader;
@ -196,8 +164,10 @@ mod tests {
}) })
.collect(); .collect();
let mut core = Core::new().unwrap();
// Index the channels // Index the channels
feeds.par_iter().for_each(|x| x.index().unwrap()); let list: Vec<_> = feeds.into_iter().map(|x| x.index()).collect();
let _foo = core.run(join_all(list));
// Assert the index rows equal the controlled results // Assert the index rows equal the controlled results
assert_eq!(dbqueries::get_sources().unwrap().len(), 4); assert_eq!(dbqueries::get_sources().unwrap().len(), 4);

View File

@ -20,7 +20,6 @@ pub use self::source::Source;
use errors::*; use errors::*;
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum IndexState<T> { pub enum IndexState<T> {
Index(T), Index(T),

View File

@ -162,7 +162,6 @@ pub(crate) struct NewEpisodeMinimal {
} }
impl NewEpisodeMinimal { impl NewEpisodeMinimal {
#[allow(dead_code)]
pub(crate) fn new(item: &rss::Item, parent_id: i32) -> Result<Self> { pub(crate) fn new(item: &rss::Item, parent_id: i32) -> Result<Self> {
if item.title().is_none() { if item.title().is_none() {
bail!("No title specified for the item.") bail!("No title specified for the item.")
@ -198,7 +197,6 @@ impl NewEpisodeMinimal {
.unwrap()) .unwrap())
} }
#[allow(dead_code)]
pub(crate) fn into_new_episode(self, item: &rss::Item) -> NewEpisode { pub(crate) fn into_new_episode(self, item: &rss::Item) -> NewEpisode {
let length = || -> Option<i32> { item.enclosure().map(|x| x.length().parse().ok())? }(); let length = || -> Option<i32> { item.enclosure().map(|x| x.length().parse().ok())? }();

View File

@ -36,7 +36,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
let list = sources let list = sources
.into_iter() .into_iter()
.map(|s| s.into_feed(&client, ignore_etags)) .map(|s| s.into_feed(&client, ignore_etags))
.map(|fut| fut.and_then(|feed| feed.index_async())) .map(|fut| fut.and_then(|feed| feed.index()))
.collect(); .collect();
let f = core.run(collect_futures(list))?; let f = core.run(collect_futures(list))?;
@ -47,7 +47,6 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
Ok(()) Ok(())
} }
#[allow(dead_code)]
fn determine_ep_state(ep: NewEpisodeMinimal, item: &rss::Item) -> Result<IndexState<NewEpisode>> { fn determine_ep_state(ep: NewEpisodeMinimal, item: &rss::Item) -> Result<IndexState<NewEpisode>> {
// Check if feed exists // Check if feed exists
let exists = dbqueries::episode_exists(ep.title(), ep.podcast_id())?; let exists = dbqueries::episode_exists(ep.title(), ep.podcast_id())?;
@ -66,13 +65,6 @@ fn determine_ep_state(ep: NewEpisodeMinimal, item: &rss::Item) -> Result<IndexSt
} }
} }
#[allow(dead_code)]
pub(crate) fn glue(item: &rss::Item, id: i32) -> Result<IndexState<NewEpisode>> {
let e = NewEpisodeMinimal::new(item, id)?;
determine_ep_state(e, item)
}
#[allow(dead_code)]
pub(crate) fn glue_async<'a>( pub(crate) fn glue_async<'a>(
item: &'a rss::Item, item: &'a rss::Item,
id: i32, id: i32,

View File

@ -218,30 +218,20 @@ mod tests {
use super::*; use super::*;
use hammond_data::Source; use hammond_data::Source;
use hammond_data::dbqueries; use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
#[test] #[test]
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit // This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
// to run it. // to run it.
#[ignore] #[ignore]
fn test_cache_image() { fn test_cache_image() {
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let url = "http://www.newrustacean.com/feed.xml"; let url = "http://www.newrustacean.com/feed.xml";
// Create and index a source // Create and index a source
let source = Source::from_url(url).unwrap(); let source = Source::from_url(url).unwrap();
// Copy it's id // Copy it's id
let sid = source.id(); let sid = source.id();
// Convert Source it into a future Feed and index it // Convert Source it into a future Feed and index it
let future = source.into_feed(&client, true); pipeline(vec![source], true).unwrap();
let feed = core.run(future).unwrap();
feed.index().unwrap();
// Get the Podcast // Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into(); let pd = dbqueries::get_podcast_from_source_id(sid).unwrap().into();

View File

@ -52,6 +52,7 @@ impl Default for Header {
impl Header { impl Header {
#[allow(dead_code)] #[allow(dead_code)]
// FIXME: should not return arc and stuff
pub fn new(content: Arc<Content>, sender: Sender<Action>) -> Arc<Header> { pub fn new(content: Arc<Content>, sender: Sender<Action>) -> Arc<Header> {
let h = Header::default(); let h = Header::default();
h.init(content, sender); h.init(content, sender);

View File

@ -117,12 +117,9 @@ pub fn add(id: i32, directory: &str, sender: Sender<Action>) {
mod tests { mod tests {
use super::*; use super::*;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use hammond_data::{Episode, Source}; use hammond_data::{Episode, Source};
use hammond_data::dbqueries; use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
use hammond_data::utils::get_download_folder; use hammond_data::utils::get_download_folder;
use std::{thread, time}; use std::{thread, time};
@ -136,20 +133,12 @@ mod tests {
// THIS IS NOT A RELIABLE TEST // THIS IS NOT A RELIABLE TEST
// Just quick sanity check // Just quick sanity check
fn test_start_dl() { fn test_start_dl() {
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let url = "http://www.newrustacean.com/feed.xml"; let url = "http://www.newrustacean.com/feed.xml";
// Create and index a source // Create and index a source
let source = Source::from_url(url).unwrap(); let source = Source::from_url(url).unwrap();
// Copy it's id // Copy it's id
let sid = source.id(); let sid = source.id();
// Convert Source it into a future Feed and index it pipeline(vec![source], true).unwrap();
let future = source.into_feed(&client, true);
let feed = core.run(future).unwrap();
feed.index().unwrap();
// Get the Podcast // Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap(); let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();

View File

@ -83,30 +83,19 @@ mod tests {
use super::*; use super::*;
use hammond_data::Source; use hammond_data::Source;
use hammond_data::dbqueries; use hammond_data::dbqueries;
use hammond_data::pipeline::pipeline;
use hyper::Client;
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
#[test] #[test]
// This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit // This test inserts an rss feed to your `XDG_DATA/hammond/hammond.db` so we make it explicit
// to run it. // to run it.
#[ignore] #[ignore]
fn test_get_pixbuf_from_path() { fn test_get_pixbuf_from_path() {
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let url = "http://www.newrustacean.com/feed.xml"; let url = "http://www.newrustacean.com/feed.xml";
// Create and index a source // Create and index a source
let source = Source::from_url(url).unwrap(); let source = Source::from_url(url).unwrap();
// Copy it's id // Copy it's id
let sid = source.id(); let sid = source.id();
// Convert Source it into a future Feed and index it pipeline(vec![source], true).unwrap();
let future = source.into_feed(&client, true);
let feed = core.run(future).unwrap();
feed.index().unwrap();
// Get the Podcast // Get the Podcast
let pd = dbqueries::get_podcast_from_source_id(sid).unwrap(); let pd = dbqueries::get_podcast_from_source_id(sid).unwrap();