diff --git a/Cargo.lock b/Cargo.lock index 460f6d0..9794c2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1572,6 +1572,7 @@ dependencies = [ "pretty_assertions 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.9.22 (registry+https://github.com/rust-lang/crates.io-index)", "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index e750587..0e66aa6 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -17,6 +17,7 @@ url = "2.1.0" xdg = "2.2.0" xml-rs = "0.8.0" futures = "0.1.29" +reqwest = "0.9" hyper = "0.12.35" http = "0.1.19" tokio = "0.1.22" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 6559fa1..07517c8 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -21,8 +21,7 @@ use diesel; use diesel::r2d2; use diesel_migrations::RunMigrationsError; use http; -use hyper; -use native_tls; +use reqwest; use rss; use url; use xml; @@ -38,12 +37,12 @@ use crate::models::Source; #[derive(Fail, Debug)] pub struct HttpStatusError { url: String, - status_code: hyper::StatusCode, + status_code: reqwest::StatusCode, context: String, } impl HttpStatusError { - pub fn new(url: String, code: hyper::StatusCode, context: String) -> Self { + pub fn new(url: String, code: reqwest::StatusCode, context: String) -> Self { HttpStatusError { url, status_code: code, @@ -62,14 +61,12 @@ pub enum DataError { R2D2Error(#[cause] r2d2::Error), #[fail(display = "R2D2 Pool error: {}", _0)] R2D2PoolError(#[cause] r2d2::PoolError), - #[fail(display = "Hyper Error: {}", _0)] - HyperError(#[cause] hyper::Error), + #[fail(display = "Reqwest Error: {}", _0)] + ReqwestError(#[cause] reqwest::Error), #[fail(display = "ToStr Error: {}", _0)] HttpToStr(#[cause] http::header::ToStrError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), - #[fail(display = "TLS Error: {}", _0)] - TLSError(#[cause] native_tls::Error), #[fail(display = "IO Error: {}", _0)] IOError(#[cause] io::Error), #[fail(display = "RSS Error: {}", _0)] @@ -111,10 +108,9 @@ easy_from_impl!( diesel::result::Error => DataError::DieselResultError, r2d2::Error => DataError::R2D2Error, r2d2::PoolError => DataError::R2D2PoolError, - hyper::Error => DataError::HyperError, + reqwest::Error => DataError::ReqwestError, http::header::ToStrError => DataError::HttpToStr, url::ParseError => DataError::UrlError, - native_tls::Error => DataError::TLSError, io::Error => DataError::IOError, rss::Error => DataError::RssError, xml::reader::Error => DataError::XmlReaderError, diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index 509be15..55d5668 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -22,17 +22,15 @@ use diesel::SaveChangesDsl; use rss::Channel; use url::Url; -use hyper::client::HttpConnector; -use hyper::{Body, Client}; -use hyper_tls::HttpsConnector; +use reqwest::r#async::{Client, Response}; +// use reqwest::StatusCode; use http::header::{ - HeaderValue, AUTHORIZATION, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, LOCATION, + HeaderValue, AUTHORIZATION, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, USER_AGENT as USER_AGENT_HEADER, }; -use http::{Request, Response, StatusCode, Uri}; // use futures::future::ok; -use futures::future::{loop_fn, Future, Loop}; +use futures::future::Future; use futures::prelude::*; use base64::{encode_config, URL_SAFE}; @@ -114,7 +112,7 @@ impl Source { /// Extract Etag and LastModifier from res, and update self and the /// corresponding db row. - fn update_etag(mut self, res: &Response) -> Result { + fn update_etag(mut self, res: &Response) -> Result { let headers = res.headers(); let etag = headers @@ -144,21 +142,11 @@ impl Source { self.last_modified = None; } - fn make_err(self, context: &str, code: StatusCode) -> DataError { - DataError::HttpStatusGeneral(HttpStatusError::new(self.uri, code, context.into())) - } - - // TODO match on more stuff // 301: Moved Permanently // 304: Up to date Feed, checked with the Etag // 307: Temporary redirect of the url // 308: Permanent redirect of the url - // 401: Unathorized - // 403: Forbidden - // 408: Timeout - // 410: Feed deleted - // TODO: Rething this api, - fn match_status(mut self, res: Response) -> Result, DataError> { + fn match_status(mut self, res: Response) -> Result { let code = res.status(); if code.is_success() { @@ -181,46 +169,31 @@ impl Source { info!("304: Source, (id: {}), is up to date", self.id()); return Err(DataError::FeedNotModified(self)); } - 301 | 302 | 308 => { - warn!("Feed was moved permanently."); - self = self.update_url(&res)?; - return Err(DataError::FeedRedirect(self)); - } - 307 => { - warn!("307: Temporary Redirect."); - // FIXME: How is it actually handling the redirect? - return Err(DataError::FeedRedirect(self)); - } - 401 => return Err(self.make_err("401: Unauthorized.", code)), - 403 => return Err(self.make_err("403: Forbidden.", code)), - 404 => return Err(self.make_err("404: Not found.", code)), - 408 => return Err(self.make_err("408: Request Timeout.", code)), - 410 => return Err(self.make_err("410: Feed was deleted..", code)), _ => info!("HTTP StatusCode: {}", code), }; Ok(res) } - fn update_url(mut self, res: &Response) -> Result { - let code = res.status(); - let headers = res.headers(); - info!("HTTP StatusCode: {}", code); - debug!("Headers {:#?}", headers); + // fn update_url(mut self, res: &Response) -> Result { + // let code = res.status(); + // let headers = res.headers(); + // info!("HTTP StatusCode: {}", code); + // debug!("Headers {:#?}", headers); - if let Some(url) = headers.get(LOCATION) { - debug!("Previous Source: {:#?}", &self); + // if let Some(url) = headers.get(LOCATION) { + // debug!("Previous Source: {:#?}", &self); - self.set_uri(url.to_str()?.into()); - self.clear_etags(); - self = self.save()?; + // self.set_uri(url.to_str()?.into()); + // self.clear_etags(); + // self = self.save()?; - debug!("Updated Source: {:#?}", &self); - info!("Feed url of Source {}, was updated succesfully.", self.id()); - } + // debug!("Updated Source: {:#?}", &self); + // info!("Feed url of Source {}, was updated succesfully.", self.id()); + // } - Ok(self) - } + // Ok(self) + // } /// Construct a new `Source` with the given `uri` and index it. /// @@ -239,27 +212,10 @@ impl Source { /// /// Consumes `self` and Returns the corresponding `Feed` Object. // Refactor into TryInto once it lands on stable. - pub fn into_feed( - self, - client: Client>, - ) -> impl Future { + pub fn into_feed(self, client: Client) -> impl Future { let id = self.id(); - let response = loop_fn(self, move |source| { - source - .request_constructor(&client.clone()) - .then(|res| match res { - Ok(response) => Ok(Loop::Break(response)), - Err(err) => match err { - DataError::FeedRedirect(s) => { - info!("Following redirect..."); - Ok(Loop::Continue(s)) - } - e => Err(e), - }, - }) - }); - response + self.request_constructor(&client.clone()) .and_then(response_to_channel) .and_then(move |chan| { FeedBuilder::default() @@ -272,11 +228,12 @@ impl Source { fn request_constructor( self, - client: &Client>, - ) -> impl Future, Error = DataError> { - // FIXME: remove unwrap somehow - let uri = Uri::from_str(self.uri()).unwrap(); - let mut req = Request::get(uri).body(Body::empty()).unwrap(); + client: &Client, + ) -> impl Future { + let mut req = client + .get(self.uri()) + // Set the UserAgent cause ppl still seem to check it for some reason... + .header(USER_AGENT_HEADER, HeaderValue::from_static(USER_AGENT)); if let Ok(url) = Url::parse(self.uri()) { if let Some(password) = url.password() { @@ -285,35 +242,25 @@ impl Source { &format!("{}:{}", url.username(), password), URL_SAFE, )); - req.headers_mut() - .insert(AUTHORIZATION, HeaderValue::from_str(&auth).unwrap()); + req = req.header(AUTHORIZATION, HeaderValue::from_str(&auth).unwrap()); } } - // Set the UserAgent cause ppl still seem to check it for some reason... - req.headers_mut() - .insert(USER_AGENT_HEADER, HeaderValue::from_static(USER_AGENT)); - if let Some(etag) = self.http_etag() { - req.headers_mut() - .insert(IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap()); + req = req.header(IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap()); } if let Some(lmod) = self.last_modified() { - req.headers_mut() - .insert(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap()); + req = req.header(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap()); } - client - .request(req) + req.send() .map_err(From::from) .and_then(move |res| self.match_status(res)) } } -fn response_to_channel( - res: Response, -) -> impl Future + Send { +fn response_to_channel(res: Response) -> impl Future + Send { res.into_body() .concat2() .map(|x| x.into_iter()) @@ -327,7 +274,6 @@ fn response_to_channel( mod tests { use super::*; use failure::Error; - use num_cpus; use tokio; use crate::database::truncate_db; @@ -338,8 +284,7 @@ mod tests { truncate_db()?; let mut rt = tokio::runtime::Runtime::new()?; - let https = HttpsConnector::new(num_cpus::get())?; - let client = Client::builder().build::<_, Body>(https); + let client = reqwest::r#async::Client::new(); let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\ com/InterceptedWithJeremyScahill"; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index c3a0271..12b8d90 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -23,26 +23,20 @@ use futures::{future::ok, lazy, prelude::*, stream::FuturesUnordered}; use tokio; -use hyper::client::HttpConnector; -use hyper::{Body, Client}; -use hyper_tls::HttpsConnector; - -use num_cpus; +use reqwest::r#async::Client; use crate::errors::DataError; use crate::Source; use std::iter::FromIterator; -type HttpsClient = Client>; - /// The pipline to be run for indexing and updating a Podcast feed that originates from /// `Source.uri`. /// /// Messy temp diagram: /// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` -> /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. -pub fn pipeline<'a, S>(sources: S, client: HttpsClient) -> impl Future + 'a +pub fn pipeline<'a, S>(sources: S, client: Client) -> impl Future + 'a where S: Stream + Send + 'a, { @@ -67,14 +61,14 @@ where .for_each(move |_| ok(())) } -/// Creates a tokio `reactor::Core`, and a `hyper::Client` and -/// runs the pipeline to completion. The `reactor::Core` is dropped afterwards. +/// Creates a `tokio::runtime::Runtime`, and a `reqwuest::async::Client` and +/// runs the pipeline to completion. The Runtime and Client are dropped afterwards. +/// FIXME: require the caller to handle the runtime and the client pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { - let https = HttpsConnector::new(num_cpus::get())?; - let client = Client::builder().build::<_, Body>(https); + let client = Client::new(); let foo = sources.into_iter().map(ok::<_, _>); let stream = FuturesUnordered::from_iter(foo);