From c6a24e839a8ba77d09673f299cfc1e64ba7078f3 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Sat, 14 Apr 2018 05:30:29 +0300 Subject: [PATCH] h-data: Implement a tail-recursion loop to follow redirects. Follow http 301 permanent redirects by using a future::loop_fn. It's kinda funcky, match_status still returns status_codes as erros and a new DataError Variant had to be added to distiguise when we should Loop::Continue. This could be cleaned up a lot. --- hammond-data/src/errors.rs | 30 +++++++++++++++++++----- hammond-data/src/models/source.rs | 39 +++++++++++++++++++------------ hammond-data/src/pipeline.rs | 23 ++++++++++++++++-- hammond-gtk/src/utils.rs | 27 ++------------------- 4 files changed, 71 insertions(+), 48 deletions(-) diff --git a/hammond-data/src/errors.rs b/hammond-data/src/errors.rs index 1b87ca0..9054ef3 100644 --- a/hammond-data/src/errors.rs +++ b/hammond-data/src/errors.rs @@ -8,6 +8,26 @@ use url; use std::io; +use models::Source; + +#[fail(display = "Request to {} returned {}. Context: {}", url, status_code, context)] +#[derive(Fail, Debug)] +pub struct HttpStatusError { + url: String, + status_code: hyper::StatusCode, + context: String, +} + +impl HttpStatusError { + pub fn new(url: String, code: hyper::StatusCode, context: String) -> Self { + HttpStatusError { + url, + status_code: code, + context, + } + } +} + #[derive(Fail, Debug)] pub enum DataError { #[fail(display = "SQL Query failed: {}", _0)] @@ -31,12 +51,10 @@ pub enum DataError { RssError(#[cause] rss::Error), #[fail(display = "Error: {}", _0)] Bail(String), - #[fail(display = "Request to {} returned {}. Context: {}", url, status_code, context)] - HttpStatusError { - url: String, - status_code: hyper::StatusCode, - context: String, - }, + #[fail(display = "{}", _0)] + HttpStatusGeneral(HttpStatusError), + #[fail(display = "FIXME: This should be better")] + F301(Source), #[fail(display = "Error occured while Parsing an Episode. Reason: {}", reason)] ParseEpisodeError { reason: String, parent_id: i32 }, #[fail(display = "No Futures where produced to be run.")] diff --git a/hammond-data/src/models/source.rs b/hammond-data/src/models/source.rs index c4409c6..0187d32 100644 --- a/hammond-data/src/models/source.rs +++ b/hammond-data/src/models/source.rs @@ -10,10 +10,11 @@ use hyper::{Client, Method, Request, Response, StatusCode, Uri}; use hyper_tls::HttpsConnector; // use futures::future::ok; +use futures::future::{loop_fn, Future, Loop}; use futures::prelude::*; use database::connection; -use errors::DataError; +use errors::*; use feed::{Feed, FeedBuilder}; use models::{NewSource, Save}; use schema::source; @@ -102,11 +103,7 @@ impl Source { } fn make_err(self, context: &str, code: StatusCode) -> DataError { - DataError::HttpStatusError { - url: self.uri, - status_code: code, - context: context.into(), - } + DataError::HttpStatusGeneral(HttpStatusError::new(self.uri, code, context.into())) } // TODO match on more stuff @@ -128,7 +125,7 @@ impl Source { StatusCode::MovedPermanently => { error!("Feed was moved permanently."); self.handle_301(&res)?; - return Err(self.make_err("301: Feed was moved permanently.", code)); + return Err(DataError::F301(self)); } StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."), StatusCode::PermanentRedirect => warn!("308: Permanent Redirect."), @@ -151,8 +148,6 @@ impl Source { self.last_modified = None; self.save()?; info!("Feed url was updated succesfully."); - // TODO: Refresh in place instead of next time, Not a priority. - info!("New content will be fetched with the next refesh."); } Ok(()) @@ -177,12 +172,27 @@ impl Source { // Refactor into TryInto once it lands on stable. pub fn into_feed( self, - client: &Client>, + client: Client>, ignore_etags: bool, ) -> Box> { let id = self.id(); - let feed = self.request_constructor(client, ignore_etags) - .and_then(move |(_, res)| response_to_channel(res)) + let response = loop_fn(self, move |source| { + source + .request_constructor(client.clone(), ignore_etags) + .then(|res| match res { + Ok(s) => Ok(Loop::Break(s)), + Err(err) => match err { + DataError::F301(s) => { + info!("Following redirect..."); + Ok(Loop::Continue(s)) + } + e => Err(e), + }, + }) + }); + + let feed = response + .and_then(|(_, res)| response_to_channel(res)) .and_then(move |chan| { FeedBuilder::default() .channel(chan) @@ -198,7 +208,7 @@ impl Source { // #bools_are_just_2variant_enmus fn request_constructor( self, - client: &Client>, + client: Client>, ignore_etags: bool, ) -> Box> { // FIXME: remove unwrap somehow @@ -230,7 +240,6 @@ impl Source { let work = client .request(req) .map_err(From::from) - // TODO: tail recursion loop that would follow redirects directly .and_then(move |res| self.match_status(res)); Box::new(work) } @@ -271,7 +280,7 @@ mod tests { let source = Source::from_url(url).unwrap(); let id = source.id(); - let feed = source.into_feed(&client, true); + let feed = source.into_feed(client, true); let feed = core.run(feed).unwrap(); let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id); diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index e2845b1..bc9a7f3 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -19,6 +19,25 @@ use Source; // use std::sync::{Arc, Mutex}; +// http://gtk-rs.org/tuto/closures +#[macro_export] +macro_rules! clone { + (@param _) => ( _ ); + (@param $x:ident) => ( $x ); + ($($n:ident),+ => move || $body:expr) => ( + { + $( let $n = $n.clone(); )+ + move || $body + } + ); + ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( + { + $( let $n = $n.clone(); )+ + move |$(clone!(@param $p),)+| $body + } + ); +} + /// The pipline to be run for indexing and updating a Podcast feed that originates from /// `Source.uri`. /// @@ -33,7 +52,7 @@ pub fn pipeline>( ) -> Result<(), DataError> { let list: Vec<_> = sources .into_iter() - .map(move |s| s.into_feed(&client, ignore_etags)) + .map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) .map(|fut| fut.and_then(|feed| feed.index())) .map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err))) .collect(); @@ -73,7 +92,7 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataErro .connector(HttpsConnector::new(num_cpus::get(), &handle)?) .build(&handle); - let work = s.into_feed(&client, ignore_etags) + let work = s.into_feed(client, ignore_etags) .and_then(move |feed| feed.index()) .map(|_| ()); diff --git a/hammond-gtk/src/utils.rs b/hammond-gtk/src/utils.rs index 906d0fb..e387773 100644 --- a/hammond-gtk/src/utils.rs +++ b/hammond-gtk/src/utils.rs @@ -140,34 +140,11 @@ fn refresh_feed(source: Option>, sender: Sender) -> Result<( sender.send(Action::HeaderBarShowUpdateIndicator)?; rayon::spawn(move || { - let mut sources = source.unwrap_or_else(|| { + let sources = source.unwrap_or_else(|| { dbqueries::get_sources().expect("Failed to retrieve Sources from the database.") }); - // Work around to improve the feed addition experience. - // Many times links to rss feeds are just redirects(usually to an https - // version). Sadly I haven't figured yet a nice way to follow up links - // redirects without getting to lifetime hell with futures and hyper. - // So the requested refresh is only of 1 feed, and the feed fails to be indexed, - // (as a 301 redict would update the source entry and exit), another refresh is - // run. For more see hammond_data/src/models/source.rs `fn - // request_constructor`. also ping me on irc if or open an issue if you - // want to tackle it. - if sources.len() == 1 { - let source = sources.remove(0); - let id = source.id(); - if let Err(err) = pipeline::index_single_source(source, false) { - error!("Error While trying to update the database."); - error!("Error msg: {}", err); - if let Ok(source) = dbqueries::get_source_from_id(id) { - if let Err(err) = pipeline::index_single_source(source, false) { - error!("Error While trying to update the database."); - error!("Error msg: {}", err); - } - } - } - // This is what would normally run - } else if let Err(err) = pipeline::run(sources, false) { + if let Err(err) = pipeline::run(sources, false) { error!("Error While trying to update the database."); error!("Error msg: {}", err); }