h-data: Implement a tail-recursion loop to follow redirects.
Follow http 301 permanent redirects by using a future::loop_fn. It's kinda funcky, match_status still returns status_codes as erros and a new DataError Variant had to be added to distiguise when we should Loop::Continue. This could be cleaned up a lot.
This commit is contained in:
parent
87421ce74d
commit
c6a24e839a
@ -8,6 +8,26 @@ use url;
|
|||||||
|
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
use models::Source;
|
||||||
|
|
||||||
|
#[fail(display = "Request to {} returned {}. Context: {}", url, status_code, context)]
|
||||||
|
#[derive(Fail, Debug)]
|
||||||
|
pub struct HttpStatusError {
|
||||||
|
url: String,
|
||||||
|
status_code: hyper::StatusCode,
|
||||||
|
context: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpStatusError {
|
||||||
|
pub fn new(url: String, code: hyper::StatusCode, context: String) -> Self {
|
||||||
|
HttpStatusError {
|
||||||
|
url,
|
||||||
|
status_code: code,
|
||||||
|
context,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Fail, Debug)]
|
#[derive(Fail, Debug)]
|
||||||
pub enum DataError {
|
pub enum DataError {
|
||||||
#[fail(display = "SQL Query failed: {}", _0)]
|
#[fail(display = "SQL Query failed: {}", _0)]
|
||||||
@ -31,12 +51,10 @@ pub enum DataError {
|
|||||||
RssError(#[cause] rss::Error),
|
RssError(#[cause] rss::Error),
|
||||||
#[fail(display = "Error: {}", _0)]
|
#[fail(display = "Error: {}", _0)]
|
||||||
Bail(String),
|
Bail(String),
|
||||||
#[fail(display = "Request to {} returned {}. Context: {}", url, status_code, context)]
|
#[fail(display = "{}", _0)]
|
||||||
HttpStatusError {
|
HttpStatusGeneral(HttpStatusError),
|
||||||
url: String,
|
#[fail(display = "FIXME: This should be better")]
|
||||||
status_code: hyper::StatusCode,
|
F301(Source),
|
||||||
context: String,
|
|
||||||
},
|
|
||||||
#[fail(display = "Error occured while Parsing an Episode. Reason: {}", reason)]
|
#[fail(display = "Error occured while Parsing an Episode. Reason: {}", reason)]
|
||||||
ParseEpisodeError { reason: String, parent_id: i32 },
|
ParseEpisodeError { reason: String, parent_id: i32 },
|
||||||
#[fail(display = "No Futures where produced to be run.")]
|
#[fail(display = "No Futures where produced to be run.")]
|
||||||
|
|||||||
@ -10,10 +10,11 @@ use hyper::{Client, Method, Request, Response, StatusCode, Uri};
|
|||||||
use hyper_tls::HttpsConnector;
|
use hyper_tls::HttpsConnector;
|
||||||
|
|
||||||
// use futures::future::ok;
|
// use futures::future::ok;
|
||||||
|
use futures::future::{loop_fn, Future, Loop};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
|
||||||
use database::connection;
|
use database::connection;
|
||||||
use errors::DataError;
|
use errors::*;
|
||||||
use feed::{Feed, FeedBuilder};
|
use feed::{Feed, FeedBuilder};
|
||||||
use models::{NewSource, Save};
|
use models::{NewSource, Save};
|
||||||
use schema::source;
|
use schema::source;
|
||||||
@ -102,11 +103,7 @@ impl Source {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn make_err(self, context: &str, code: StatusCode) -> DataError {
|
fn make_err(self, context: &str, code: StatusCode) -> DataError {
|
||||||
DataError::HttpStatusError {
|
DataError::HttpStatusGeneral(HttpStatusError::new(self.uri, code, context.into()))
|
||||||
url: self.uri,
|
|
||||||
status_code: code,
|
|
||||||
context: context.into(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO match on more stuff
|
// TODO match on more stuff
|
||||||
@ -128,7 +125,7 @@ impl Source {
|
|||||||
StatusCode::MovedPermanently => {
|
StatusCode::MovedPermanently => {
|
||||||
error!("Feed was moved permanently.");
|
error!("Feed was moved permanently.");
|
||||||
self.handle_301(&res)?;
|
self.handle_301(&res)?;
|
||||||
return Err(self.make_err("301: Feed was moved permanently.", code));
|
return Err(DataError::F301(self));
|
||||||
}
|
}
|
||||||
StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."),
|
StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."),
|
||||||
StatusCode::PermanentRedirect => warn!("308: Permanent Redirect."),
|
StatusCode::PermanentRedirect => warn!("308: Permanent Redirect."),
|
||||||
@ -151,8 +148,6 @@ impl Source {
|
|||||||
self.last_modified = None;
|
self.last_modified = None;
|
||||||
self.save()?;
|
self.save()?;
|
||||||
info!("Feed url was updated succesfully.");
|
info!("Feed url was updated succesfully.");
|
||||||
// TODO: Refresh in place instead of next time, Not a priority.
|
|
||||||
info!("New content will be fetched with the next refesh.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -177,12 +172,27 @@ impl Source {
|
|||||||
// Refactor into TryInto once it lands on stable.
|
// Refactor into TryInto once it lands on stable.
|
||||||
pub fn into_feed(
|
pub fn into_feed(
|
||||||
self,
|
self,
|
||||||
client: &Client<HttpsConnector<HttpConnector>>,
|
client: Client<HttpsConnector<HttpConnector>>,
|
||||||
ignore_etags: bool,
|
ignore_etags: bool,
|
||||||
) -> Box<Future<Item = Feed, Error = DataError>> {
|
) -> Box<Future<Item = Feed, Error = DataError>> {
|
||||||
let id = self.id();
|
let id = self.id();
|
||||||
let feed = self.request_constructor(client, ignore_etags)
|
let response = loop_fn(self, move |source| {
|
||||||
.and_then(move |(_, res)| response_to_channel(res))
|
source
|
||||||
|
.request_constructor(client.clone(), ignore_etags)
|
||||||
|
.then(|res| match res {
|
||||||
|
Ok(s) => Ok(Loop::Break(s)),
|
||||||
|
Err(err) => match err {
|
||||||
|
DataError::F301(s) => {
|
||||||
|
info!("Following redirect...");
|
||||||
|
Ok(Loop::Continue(s))
|
||||||
|
}
|
||||||
|
e => Err(e),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let feed = response
|
||||||
|
.and_then(|(_, res)| response_to_channel(res))
|
||||||
.and_then(move |chan| {
|
.and_then(move |chan| {
|
||||||
FeedBuilder::default()
|
FeedBuilder::default()
|
||||||
.channel(chan)
|
.channel(chan)
|
||||||
@ -198,7 +208,7 @@ impl Source {
|
|||||||
// #bools_are_just_2variant_enmus
|
// #bools_are_just_2variant_enmus
|
||||||
fn request_constructor(
|
fn request_constructor(
|
||||||
self,
|
self,
|
||||||
client: &Client<HttpsConnector<HttpConnector>>,
|
client: Client<HttpsConnector<HttpConnector>>,
|
||||||
ignore_etags: bool,
|
ignore_etags: bool,
|
||||||
) -> Box<Future<Item = (Self, Response), Error = DataError>> {
|
) -> Box<Future<Item = (Self, Response), Error = DataError>> {
|
||||||
// FIXME: remove unwrap somehow
|
// FIXME: remove unwrap somehow
|
||||||
@ -230,7 +240,6 @@ impl Source {
|
|||||||
let work = client
|
let work = client
|
||||||
.request(req)
|
.request(req)
|
||||||
.map_err(From::from)
|
.map_err(From::from)
|
||||||
// TODO: tail recursion loop that would follow redirects directly
|
|
||||||
.and_then(move |res| self.match_status(res));
|
.and_then(move |res| self.match_status(res));
|
||||||
Box::new(work)
|
Box::new(work)
|
||||||
}
|
}
|
||||||
@ -271,7 +280,7 @@ mod tests {
|
|||||||
let source = Source::from_url(url).unwrap();
|
let source = Source::from_url(url).unwrap();
|
||||||
let id = source.id();
|
let id = source.id();
|
||||||
|
|
||||||
let feed = source.into_feed(&client, true);
|
let feed = source.into_feed(client, true);
|
||||||
let feed = core.run(feed).unwrap();
|
let feed = core.run(feed).unwrap();
|
||||||
|
|
||||||
let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id);
|
let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id);
|
||||||
|
|||||||
@ -19,6 +19,25 @@ use Source;
|
|||||||
|
|
||||||
// use std::sync::{Arc, Mutex};
|
// use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
// http://gtk-rs.org/tuto/closures
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! clone {
|
||||||
|
(@param _) => ( _ );
|
||||||
|
(@param $x:ident) => ( $x );
|
||||||
|
($($n:ident),+ => move || $body:expr) => (
|
||||||
|
{
|
||||||
|
$( let $n = $n.clone(); )+
|
||||||
|
move || $body
|
||||||
|
}
|
||||||
|
);
|
||||||
|
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
|
||||||
|
{
|
||||||
|
$( let $n = $n.clone(); )+
|
||||||
|
move |$(clone!(@param $p),)+| $body
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// The pipline to be run for indexing and updating a Podcast feed that originates from
|
/// The pipline to be run for indexing and updating a Podcast feed that originates from
|
||||||
/// `Source.uri`.
|
/// `Source.uri`.
|
||||||
///
|
///
|
||||||
@ -33,7 +52,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(
|
|||||||
) -> Result<(), DataError> {
|
) -> Result<(), DataError> {
|
||||||
let list: Vec<_> = sources
|
let list: Vec<_> = sources
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(move |s| s.into_feed(&client, ignore_etags))
|
.map(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags)))
|
||||||
.map(|fut| fut.and_then(|feed| feed.index()))
|
.map(|fut| fut.and_then(|feed| feed.index()))
|
||||||
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
|
.map(|fut| fut.map(|_| ()).map_err(|err| error!("Error: {}", err)))
|
||||||
.collect();
|
.collect();
|
||||||
@ -73,7 +92,7 @@ pub fn index_single_source(s: Source, ignore_etags: bool) -> Result<(), DataErro
|
|||||||
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
|
.connector(HttpsConnector::new(num_cpus::get(), &handle)?)
|
||||||
.build(&handle);
|
.build(&handle);
|
||||||
|
|
||||||
let work = s.into_feed(&client, ignore_etags)
|
let work = s.into_feed(client, ignore_etags)
|
||||||
.and_then(move |feed| feed.index())
|
.and_then(move |feed| feed.index())
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
|
|
||||||
|
|||||||
@ -140,34 +140,11 @@ fn refresh_feed(source: Option<Vec<Source>>, sender: Sender<Action>) -> Result<(
|
|||||||
sender.send(Action::HeaderBarShowUpdateIndicator)?;
|
sender.send(Action::HeaderBarShowUpdateIndicator)?;
|
||||||
|
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
let mut sources = source.unwrap_or_else(|| {
|
let sources = source.unwrap_or_else(|| {
|
||||||
dbqueries::get_sources().expect("Failed to retrieve Sources from the database.")
|
dbqueries::get_sources().expect("Failed to retrieve Sources from the database.")
|
||||||
});
|
});
|
||||||
|
|
||||||
// Work around to improve the feed addition experience.
|
if let Err(err) = pipeline::run(sources, false) {
|
||||||
// Many times links to rss feeds are just redirects(usually to an https
|
|
||||||
// version). Sadly I haven't figured yet a nice way to follow up links
|
|
||||||
// redirects without getting to lifetime hell with futures and hyper.
|
|
||||||
// So the requested refresh is only of 1 feed, and the feed fails to be indexed,
|
|
||||||
// (as a 301 redict would update the source entry and exit), another refresh is
|
|
||||||
// run. For more see hammond_data/src/models/source.rs `fn
|
|
||||||
// request_constructor`. also ping me on irc if or open an issue if you
|
|
||||||
// want to tackle it.
|
|
||||||
if sources.len() == 1 {
|
|
||||||
let source = sources.remove(0);
|
|
||||||
let id = source.id();
|
|
||||||
if let Err(err) = pipeline::index_single_source(source, false) {
|
|
||||||
error!("Error While trying to update the database.");
|
|
||||||
error!("Error msg: {}", err);
|
|
||||||
if let Ok(source) = dbqueries::get_source_from_id(id) {
|
|
||||||
if let Err(err) = pipeline::index_single_source(source, false) {
|
|
||||||
error!("Error While trying to update the database.");
|
|
||||||
error!("Error msg: {}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// This is what would normally run
|
|
||||||
} else if let Err(err) = pipeline::run(sources, false) {
|
|
||||||
error!("Error While trying to update the database.");
|
error!("Error While trying to update the database.");
|
||||||
error!("Error msg: {}", err);
|
error!("Error msg: {}", err);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user