From a5fd79e220cf7dcc23d59d632603517e9922ad3e Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sat, 13 Jan 2018 14:47:38 +0200 Subject: [PATCH] I hate futures error handling. --- hammond-data/benches/bench.rs | 50 +++++++++++++------ hammond-data/src/models/queryables.rs | 71 ++++++++++++++++++++------- hammond-data/src/pipeline.rs | 28 ----------- 3 files changed, 86 insertions(+), 63 deletions(-) diff --git a/hammond-data/benches/bench.rs b/hammond-data/benches/bench.rs index d6dd37d..bcbc7c0 100644 --- a/hammond-data/benches/bench.rs +++ b/hammond-data/benches/bench.rs @@ -1,6 +1,5 @@ #![feature(test)] -extern crate diesel; extern crate futures; extern crate hammond_data; extern crate hyper; @@ -8,11 +7,10 @@ extern crate hyper_tls; extern crate rand; extern crate rayon; extern crate rss; -extern crate tempdir; extern crate test; extern crate tokio_core; -use rayon::prelude::*; +// use rayon::prelude::*; use test::Bencher; @@ -42,8 +40,26 @@ static URLS: &[(&[u8], &str)] = &[ (LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"), ]; +static URLS2: &[&str] = &[ + "https://feeds.feedburner.com/InterceptedWithJeremyScahill", + "http://www.badvoltage.org/feed/ogg/", + "https://www.theguardian.com/news/series/the-audio-long-read/podcast.xml", + "http://feeds.feedburner.com/coderradiomp3", + "https://rss.art19.com/steal-the-stars", + "https://feeds.mozilla-podcasts.org/irl", + "http://economicupdate.libsyn.com/rss", + "http://feeds.feedburner.com/linuxunplugged", + "http://ubuntupodcast.org/feed/ogg/", + "http://www.newrustacean.com/feed.xml", + "http://feeds.propublica.org/propublica/podcast", + "https://rss.acast.com/thetipoff", + "http://feeds.soundcloud.com/users/soundcloud:users:277306156/sounds.rss", + "http://revolutionspodcast.libsyn.com/rss/", + "https://www.greaterthancode.com/feed/podcast", +]; + fn index_urls() { - let feeds: Vec<_> = URLS.par_iter() + let feeds: Vec<_> = URLS.iter() .map(|&(buff, url)| { // Create and insert a Source into db let s = Source::from_url(url).unwrap(); @@ -53,7 +69,7 @@ fn index_urls() { }) .collect(); - feeds.par_iter().for_each(|x| index(x)); + feeds.iter().for_each(|x| index(x)); } #[bench] @@ -77,13 +93,14 @@ fn bench_index_unchanged_feeds(b: &mut Bencher) { #[bench] fn bench_get_normal_feeds(b: &mut Bencher) { + // Index first so it will only bench the comparison test case. truncate_db().unwrap(); b.iter(|| { - URLS.iter().for_each(|&(_, url)| { + URLS2.iter().for_each(|url| { let mut s = Source::from_url(url).unwrap(); - s.into_feed(true).unwrap(); - }) + let _feed = s.into_feed(true); + }); }); } @@ -93,20 +110,21 @@ fn bench_get_future_feeds(b: &mut Bencher) { b.iter(|| { let mut core = Core::new().unwrap(); - let mut handle = core.handle(); - let mut client = Client::configure() + let handle = core.handle(); + let client = Client::configure() .connector(HttpsConnector::new(4, &handle).unwrap()) .build(&handle); - let mut foo: Vec<_>; + let mut foo = vec![]; - URLS.iter().for_each(|&(_, url)| { - let mut s = Source::from_url(url).unwrap(); - let future = s.into_fututre_feed(&mut client, true); + URLS2.iter().for_each(|url| { + let s = Source::from_url(url).unwrap(); + let future = s.into_fututre_feed(&client, true); foo.push(future); }); let work = join_all(foo); - core.run(work).unwrap(); - }); + let res = core.run(work); + assert!(res.is_ok()); + }) } diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index 7c40be0..528cf82 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -15,8 +15,7 @@ use hyper::Uri; use hyper_tls::HttpsConnector; // use hyper::header::{ETag, LastModified}; -use futures::{Future, Stream}; -// use futures::future::join_all; +use futures::prelude::*; use schema::{episode, podcast, source}; use feed::Feed; @@ -596,7 +595,7 @@ pub struct Source { pub http_etag: Option, } -impl<'a> Source { +impl Source { /// Get the source `id` column. pub fn id(&self) -> i32 { self.id @@ -659,7 +658,7 @@ impl<'a> Source { } /// Docs - pub fn update_etag2(&mut self, req: &hyper::Response) -> Result<()> { + pub fn update_etag2(mut self, req: &hyper::Response) -> Result<()> { let headers = req.headers(); let etag = headers.get::(); @@ -745,18 +744,21 @@ impl<'a> Source { #[allow(dead_code)] /// Docs pub fn into_fututre_feed( - &'a mut self, - client: &'a mut Client>, + self, + client: &Client>, ignore_etags: bool, - ) -> Box + 'a> { + ) -> Box> { let id = self.id(); let feed = request_constructor(&self, client, ignore_etags) .map(move |res| { - println!("Status: {}", res.status()); - self.update_etag2(&res).unwrap(); + if let Err(err) = self.update_etag2(&res) { + error!("Failed to update Source struct with new etag values"); + error!("Error: {}", err); + }; res }) - .and_then(move |res| response_to_channel(res)) + .map_err(From::from) + .and_then(|res| response_to_channel(res)) .map(move |chan| Feed::from_channel_source(chan, id)); Box::new(feed) @@ -770,11 +772,12 @@ impl<'a> Source { fn request_constructor( s: &Source, - client: &mut Client>, + client: &Client>, ignore_etags: bool, -) -> Box> { +) -> Box> { use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch}; + // FIXME: remove unwrap let uri = Uri::from_str(&s.uri()).unwrap(); let mut req = hyper::Request::new(Method::Get, uri); @@ -792,15 +795,45 @@ fn request_constructor( } } - let work = client.request(req); + let work = client.request(req).map_err(From::from); Box::new(work) } -fn response_to_channel(res: hyper::Response) -> Box> { - let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| { - let utf_8_bytes = iter.collect::>(); - let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned(); - Channel::from_str(&buf).unwrap() - }); +fn response_to_channel(res: hyper::Response) -> Box> { + let chan = res.body() + .concat2() + .map(|x| x.into_iter()) + .map_err(From::from) + .and_then(|iter| { + let utf_8_bytes = iter.collect::>(); + let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned(); + let chan = Channel::from_str(&buf).map_err(From::from); + chan + }); Box::new(chan) } + +#[cfg(test)] +mod tests { + use super::*; + use tokio_core::reactor::Core; + + use database::truncate_db; + + #[test] + fn test_into_future_feed() { + truncate_db().unwrap(); + + let mut core = Core::new().unwrap(); + let client = Client::configure() + .connector(HttpsConnector::new(4, &core.handle()).unwrap()) + .build(&core.handle()); + + let url = "http://www.newrustacean.com/feed.xml"; + let source = Source::from_url(url).unwrap(); + + let feed = source.into_fututre_feed(&client, true); + + assert!(core.run(feed).is_ok()); + } +} diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index b9d4dab..8b13789 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -1,29 +1 @@ -#[cfg(test)] -mod tests { - // use futures::future::result; - use tokio_core::reactor::Core; - use hyper::Client; - use hyper_tls::HttpsConnector; - use database::truncate_db; - use Source; - - #[test] - fn test_bar() { - truncate_db().unwrap(); - - let mut core = Core::new().unwrap(); - let mut client = Client::configure() - .connector(HttpsConnector::new(4, &core.handle()).unwrap()) - .build(&core.handle()); - - let url = "https://feeds.feedburner.com/InterceptedWithJeremyScahill"; - let mut source = Source::from_url(url).unwrap(); - - let feed = source.into_fututre_feed(&mut client, false); - - let f = core.run(feed).unwrap(); - - println!("{:?}", f); - } -}