I hate futures error handling.

This commit is contained in:
Jordan Petridis 2018-01-13 14:47:38 +02:00
parent e162f8fd3f
commit a5fd79e220
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
3 changed files with 86 additions and 63 deletions

View File

@ -1,6 +1,5 @@
#![feature(test)]
extern crate diesel;
extern crate futures;
extern crate hammond_data;
extern crate hyper;
@ -8,11 +7,10 @@ extern crate hyper_tls;
extern crate rand;
extern crate rayon;
extern crate rss;
extern crate tempdir;
extern crate test;
extern crate tokio_core;
use rayon::prelude::*;
// use rayon::prelude::*;
use test::Bencher;
@ -42,8 +40,26 @@ static URLS: &[(&[u8], &str)] = &[
(LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"),
];
static URLS2: &[&str] = &[
"https://feeds.feedburner.com/InterceptedWithJeremyScahill",
"http://www.badvoltage.org/feed/ogg/",
"https://www.theguardian.com/news/series/the-audio-long-read/podcast.xml",
"http://feeds.feedburner.com/coderradiomp3",
"https://rss.art19.com/steal-the-stars",
"https://feeds.mozilla-podcasts.org/irl",
"http://economicupdate.libsyn.com/rss",
"http://feeds.feedburner.com/linuxunplugged",
"http://ubuntupodcast.org/feed/ogg/",
"http://www.newrustacean.com/feed.xml",
"http://feeds.propublica.org/propublica/podcast",
"https://rss.acast.com/thetipoff",
"http://feeds.soundcloud.com/users/soundcloud:users:277306156/sounds.rss",
"http://revolutionspodcast.libsyn.com/rss/",
"https://www.greaterthancode.com/feed/podcast",
];
fn index_urls() {
let feeds: Vec<_> = URLS.par_iter()
let feeds: Vec<_> = URLS.iter()
.map(|&(buff, url)| {
// Create and insert a Source into db
let s = Source::from_url(url).unwrap();
@ -53,7 +69,7 @@ fn index_urls() {
})
.collect();
feeds.par_iter().for_each(|x| index(x));
feeds.iter().for_each(|x| index(x));
}
#[bench]
@ -77,13 +93,14 @@ fn bench_index_unchanged_feeds(b: &mut Bencher) {
#[bench]
fn bench_get_normal_feeds(b: &mut Bencher) {
// Index first so it will only bench the comparison test case.
truncate_db().unwrap();
b.iter(|| {
URLS.iter().for_each(|&(_, url)| {
URLS2.iter().for_each(|url| {
let mut s = Source::from_url(url).unwrap();
s.into_feed(true).unwrap();
})
let _feed = s.into_feed(true);
});
});
}
@ -93,20 +110,21 @@ fn bench_get_future_feeds(b: &mut Bencher) {
b.iter(|| {
let mut core = Core::new().unwrap();
let mut handle = core.handle();
let mut client = Client::configure()
let handle = core.handle();
let client = Client::configure()
.connector(HttpsConnector::new(4, &handle).unwrap())
.build(&handle);
let mut foo: Vec<_>;
let mut foo = vec![];
URLS.iter().for_each(|&(_, url)| {
let mut s = Source::from_url(url).unwrap();
let future = s.into_fututre_feed(&mut client, true);
URLS2.iter().for_each(|url| {
let s = Source::from_url(url).unwrap();
let future = s.into_fututre_feed(&client, true);
foo.push(future);
});
let work = join_all(foo);
core.run(work).unwrap();
});
let res = core.run(work);
assert!(res.is_ok());
})
}

View File

@ -15,8 +15,7 @@ use hyper::Uri;
use hyper_tls::HttpsConnector;
// use hyper::header::{ETag, LastModified};
use futures::{Future, Stream};
// use futures::future::join_all;
use futures::prelude::*;
use schema::{episode, podcast, source};
use feed::Feed;
@ -596,7 +595,7 @@ pub struct Source {
pub http_etag: Option<String>,
}
impl<'a> Source {
impl Source {
/// Get the source `id` column.
pub fn id(&self) -> i32 {
self.id
@ -659,7 +658,7 @@ impl<'a> Source {
}
/// Docs
pub fn update_etag2(&mut self, req: &hyper::Response) -> Result<()> {
pub fn update_etag2(mut self, req: &hyper::Response) -> Result<()> {
let headers = req.headers();
let etag = headers.get::<ETag>();
@ -745,18 +744,21 @@ impl<'a> Source {
#[allow(dead_code)]
/// Docs
pub fn into_fututre_feed(
&'a mut self,
client: &'a mut Client<HttpsConnector<HttpConnector>>,
self,
client: &Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = hyper::Error> + 'a> {
) -> Box<Future<Item = Feed, Error = Error>> {
let id = self.id();
let feed = request_constructor(&self, client, ignore_etags)
.map(move |res| {
println!("Status: {}", res.status());
self.update_etag2(&res).unwrap();
if let Err(err) = self.update_etag2(&res) {
error!("Failed to update Source struct with new etag values");
error!("Error: {}", err);
};
res
})
.and_then(move |res| response_to_channel(res))
.map_err(From::from)
.and_then(|res| response_to_channel(res))
.map(move |chan| Feed::from_channel_source(chan, id));
Box::new(feed)
@ -770,11 +772,12 @@ impl<'a> Source {
fn request_constructor(
s: &Source,
client: &mut Client<HttpsConnector<HttpConnector>>,
client: &Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = hyper::Response, Error = hyper::Error>> {
) -> Box<Future<Item = hyper::Response, Error = Error>> {
use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch};
// FIXME: remove unwrap
let uri = Uri::from_str(&s.uri()).unwrap();
let mut req = hyper::Request::new(Method::Get, uri);
@ -792,15 +795,45 @@ fn request_constructor(
}
}
let work = client.request(req);
let work = client.request(req).map_err(From::from);
Box::new(work)
}
fn response_to_channel(res: hyper::Response) -> Box<Future<Item = Channel, Error = hyper::Error>> {
let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| {
let utf_8_bytes = iter.collect::<Vec<u8>>();
let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned();
Channel::from_str(&buf).unwrap()
});
fn response_to_channel(res: hyper::Response) -> Box<Future<Item = Channel, Error = Error>> {
let chan = res.body()
.concat2()
.map(|x| x.into_iter())
.map_err(From::from)
.and_then(|iter| {
let utf_8_bytes = iter.collect::<Vec<u8>>();
let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned();
let chan = Channel::from_str(&buf).map_err(From::from);
chan
});
Box::new(chan)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_core::reactor::Core;
use database::truncate_db;
#[test]
fn test_into_future_feed() {
truncate_db().unwrap();
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let url = "http://www.newrustacean.com/feed.xml";
let source = Source::from_url(url).unwrap();
let feed = source.into_fututre_feed(&client, true);
assert!(core.run(feed).is_ok());
}
}

View File

@ -1,29 +1 @@
#[cfg(test)]
mod tests {
// use futures::future::result;
use tokio_core::reactor::Core;
use hyper::Client;
use hyper_tls::HttpsConnector;
use database::truncate_db;
use Source;
#[test]
fn test_bar() {
truncate_db().unwrap();
let mut core = Core::new().unwrap();
let mut client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
let url = "https://feeds.feedburner.com/InterceptedWithJeremyScahill";
let mut source = Source::from_url(url).unwrap();
let feed = source.into_fututre_feed(&mut client, false);
let f = core.run(feed).unwrap();
println!("{:?}", f);
}
}