podcasts-data: Port from hyper to reqwest::async
Reqwest is a higher level http client, that underneath uses hyper. It handles tls, proxys and other higher level stuff for us, and will allow use to drop some glue code, like the redirect handling, in the future.
This commit is contained in:
parent
217362fe14
commit
f58495c715
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1572,6 +1572,7 @@ dependencies = [
|
||||
"pretty_assertions 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"reqwest 0.9.22 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rss 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
||||
@ -17,6 +17,7 @@ url = "2.1.0"
|
||||
xdg = "2.2.0"
|
||||
xml-rs = "0.8.0"
|
||||
futures = "0.1.29"
|
||||
reqwest = "0.9"
|
||||
hyper = "0.12.35"
|
||||
http = "0.1.19"
|
||||
tokio = "0.1.22"
|
||||
|
||||
@ -21,8 +21,7 @@ use diesel;
|
||||
use diesel::r2d2;
|
||||
use diesel_migrations::RunMigrationsError;
|
||||
use http;
|
||||
use hyper;
|
||||
use native_tls;
|
||||
use reqwest;
|
||||
use rss;
|
||||
use url;
|
||||
use xml;
|
||||
@ -38,12 +37,12 @@ use crate::models::Source;
|
||||
#[derive(Fail, Debug)]
|
||||
pub struct HttpStatusError {
|
||||
url: String,
|
||||
status_code: hyper::StatusCode,
|
||||
status_code: reqwest::StatusCode,
|
||||
context: String,
|
||||
}
|
||||
|
||||
impl HttpStatusError {
|
||||
pub fn new(url: String, code: hyper::StatusCode, context: String) -> Self {
|
||||
pub fn new(url: String, code: reqwest::StatusCode, context: String) -> Self {
|
||||
HttpStatusError {
|
||||
url,
|
||||
status_code: code,
|
||||
@ -62,14 +61,12 @@ pub enum DataError {
|
||||
R2D2Error(#[cause] r2d2::Error),
|
||||
#[fail(display = "R2D2 Pool error: {}", _0)]
|
||||
R2D2PoolError(#[cause] r2d2::PoolError),
|
||||
#[fail(display = "Hyper Error: {}", _0)]
|
||||
HyperError(#[cause] hyper::Error),
|
||||
#[fail(display = "Reqwest Error: {}", _0)]
|
||||
ReqwestError(#[cause] reqwest::Error),
|
||||
#[fail(display = "ToStr Error: {}", _0)]
|
||||
HttpToStr(#[cause] http::header::ToStrError),
|
||||
#[fail(display = "Failed to parse a url: {}", _0)]
|
||||
UrlError(#[cause] url::ParseError),
|
||||
#[fail(display = "TLS Error: {}", _0)]
|
||||
TLSError(#[cause] native_tls::Error),
|
||||
#[fail(display = "IO Error: {}", _0)]
|
||||
IOError(#[cause] io::Error),
|
||||
#[fail(display = "RSS Error: {}", _0)]
|
||||
@ -111,10 +108,9 @@ easy_from_impl!(
|
||||
diesel::result::Error => DataError::DieselResultError,
|
||||
r2d2::Error => DataError::R2D2Error,
|
||||
r2d2::PoolError => DataError::R2D2PoolError,
|
||||
hyper::Error => DataError::HyperError,
|
||||
reqwest::Error => DataError::ReqwestError,
|
||||
http::header::ToStrError => DataError::HttpToStr,
|
||||
url::ParseError => DataError::UrlError,
|
||||
native_tls::Error => DataError::TLSError,
|
||||
io::Error => DataError::IOError,
|
||||
rss::Error => DataError::RssError,
|
||||
xml::reader::Error => DataError::XmlReaderError,
|
||||
|
||||
@ -22,17 +22,15 @@ use diesel::SaveChangesDsl;
|
||||
use rss::Channel;
|
||||
use url::Url;
|
||||
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Body, Client};
|
||||
use hyper_tls::HttpsConnector;
|
||||
use reqwest::r#async::{Client, Response};
|
||||
// use reqwest::StatusCode;
|
||||
|
||||
use http::header::{
|
||||
HeaderValue, AUTHORIZATION, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, LOCATION,
|
||||
HeaderValue, AUTHORIZATION, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED,
|
||||
USER_AGENT as USER_AGENT_HEADER,
|
||||
};
|
||||
use http::{Request, Response, StatusCode, Uri};
|
||||
// use futures::future::ok;
|
||||
use futures::future::{loop_fn, Future, Loop};
|
||||
use futures::future::Future;
|
||||
use futures::prelude::*;
|
||||
|
||||
use base64::{encode_config, URL_SAFE};
|
||||
@ -114,7 +112,7 @@ impl Source {
|
||||
|
||||
/// Extract Etag and LastModifier from res, and update self and the
|
||||
/// corresponding db row.
|
||||
fn update_etag(mut self, res: &Response<Body>) -> Result<Self, DataError> {
|
||||
fn update_etag(mut self, res: &Response) -> Result<Self, DataError> {
|
||||
let headers = res.headers();
|
||||
|
||||
let etag = headers
|
||||
@ -144,21 +142,11 @@ impl Source {
|
||||
self.last_modified = None;
|
||||
}
|
||||
|
||||
fn make_err(self, context: &str, code: StatusCode) -> DataError {
|
||||
DataError::HttpStatusGeneral(HttpStatusError::new(self.uri, code, context.into()))
|
||||
}
|
||||
|
||||
// TODO match on more stuff
|
||||
// 301: Moved Permanently
|
||||
// 304: Up to date Feed, checked with the Etag
|
||||
// 307: Temporary redirect of the url
|
||||
// 308: Permanent redirect of the url
|
||||
// 401: Unathorized
|
||||
// 403: Forbidden
|
||||
// 408: Timeout
|
||||
// 410: Feed deleted
|
||||
// TODO: Rething this api,
|
||||
fn match_status(mut self, res: Response<Body>) -> Result<Response<Body>, DataError> {
|
||||
fn match_status(mut self, res: Response) -> Result<Response, DataError> {
|
||||
let code = res.status();
|
||||
|
||||
if code.is_success() {
|
||||
@ -181,46 +169,31 @@ impl Source {
|
||||
info!("304: Source, (id: {}), is up to date", self.id());
|
||||
return Err(DataError::FeedNotModified(self));
|
||||
}
|
||||
301 | 302 | 308 => {
|
||||
warn!("Feed was moved permanently.");
|
||||
self = self.update_url(&res)?;
|
||||
return Err(DataError::FeedRedirect(self));
|
||||
}
|
||||
307 => {
|
||||
warn!("307: Temporary Redirect.");
|
||||
// FIXME: How is it actually handling the redirect?
|
||||
return Err(DataError::FeedRedirect(self));
|
||||
}
|
||||
401 => return Err(self.make_err("401: Unauthorized.", code)),
|
||||
403 => return Err(self.make_err("403: Forbidden.", code)),
|
||||
404 => return Err(self.make_err("404: Not found.", code)),
|
||||
408 => return Err(self.make_err("408: Request Timeout.", code)),
|
||||
410 => return Err(self.make_err("410: Feed was deleted..", code)),
|
||||
_ => info!("HTTP StatusCode: {}", code),
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn update_url(mut self, res: &Response<Body>) -> Result<Self, DataError> {
|
||||
let code = res.status();
|
||||
let headers = res.headers();
|
||||
info!("HTTP StatusCode: {}", code);
|
||||
debug!("Headers {:#?}", headers);
|
||||
// fn update_url(mut self, res: &Response) -> Result<Self, DataError> {
|
||||
// let code = res.status();
|
||||
// let headers = res.headers();
|
||||
// info!("HTTP StatusCode: {}", code);
|
||||
// debug!("Headers {:#?}", headers);
|
||||
|
||||
if let Some(url) = headers.get(LOCATION) {
|
||||
debug!("Previous Source: {:#?}", &self);
|
||||
// if let Some(url) = headers.get(LOCATION) {
|
||||
// debug!("Previous Source: {:#?}", &self);
|
||||
|
||||
self.set_uri(url.to_str()?.into());
|
||||
self.clear_etags();
|
||||
self = self.save()?;
|
||||
// self.set_uri(url.to_str()?.into());
|
||||
// self.clear_etags();
|
||||
// self = self.save()?;
|
||||
|
||||
debug!("Updated Source: {:#?}", &self);
|
||||
info!("Feed url of Source {}, was updated succesfully.", self.id());
|
||||
}
|
||||
// debug!("Updated Source: {:#?}", &self);
|
||||
// info!("Feed url of Source {}, was updated succesfully.", self.id());
|
||||
// }
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
// Ok(self)
|
||||
// }
|
||||
|
||||
/// Construct a new `Source` with the given `uri` and index it.
|
||||
///
|
||||
@ -239,27 +212,10 @@ impl Source {
|
||||
///
|
||||
/// Consumes `self` and Returns the corresponding `Feed` Object.
|
||||
// Refactor into TryInto once it lands on stable.
|
||||
pub fn into_feed(
|
||||
self,
|
||||
client: Client<HttpsConnector<HttpConnector>>,
|
||||
) -> impl Future<Item = Feed, Error = DataError> {
|
||||
pub fn into_feed(self, client: Client) -> impl Future<Item = Feed, Error = DataError> {
|
||||
let id = self.id();
|
||||
let response = loop_fn(self, move |source| {
|
||||
source
|
||||
.request_constructor(&client.clone())
|
||||
.then(|res| match res {
|
||||
Ok(response) => Ok(Loop::Break(response)),
|
||||
Err(err) => match err {
|
||||
DataError::FeedRedirect(s) => {
|
||||
info!("Following redirect...");
|
||||
Ok(Loop::Continue(s))
|
||||
}
|
||||
e => Err(e),
|
||||
},
|
||||
})
|
||||
});
|
||||
|
||||
response
|
||||
self.request_constructor(&client.clone())
|
||||
.and_then(response_to_channel)
|
||||
.and_then(move |chan| {
|
||||
FeedBuilder::default()
|
||||
@ -272,11 +228,12 @@ impl Source {
|
||||
|
||||
fn request_constructor(
|
||||
self,
|
||||
client: &Client<HttpsConnector<HttpConnector>>,
|
||||
) -> impl Future<Item = Response<Body>, Error = DataError> {
|
||||
// FIXME: remove unwrap somehow
|
||||
let uri = Uri::from_str(self.uri()).unwrap();
|
||||
let mut req = Request::get(uri).body(Body::empty()).unwrap();
|
||||
client: &Client,
|
||||
) -> impl Future<Item = Response, Error = DataError> {
|
||||
let mut req = client
|
||||
.get(self.uri())
|
||||
// Set the UserAgent cause ppl still seem to check it for some reason...
|
||||
.header(USER_AGENT_HEADER, HeaderValue::from_static(USER_AGENT));
|
||||
|
||||
if let Ok(url) = Url::parse(self.uri()) {
|
||||
if let Some(password) = url.password() {
|
||||
@ -285,35 +242,25 @@ impl Source {
|
||||
&format!("{}:{}", url.username(), password),
|
||||
URL_SAFE,
|
||||
));
|
||||
req.headers_mut()
|
||||
.insert(AUTHORIZATION, HeaderValue::from_str(&auth).unwrap());
|
||||
req = req.header(AUTHORIZATION, HeaderValue::from_str(&auth).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
// Set the UserAgent cause ppl still seem to check it for some reason...
|
||||
req.headers_mut()
|
||||
.insert(USER_AGENT_HEADER, HeaderValue::from_static(USER_AGENT));
|
||||
|
||||
if let Some(etag) = self.http_etag() {
|
||||
req.headers_mut()
|
||||
.insert(IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap());
|
||||
req = req.header(IF_NONE_MATCH, HeaderValue::from_str(etag).unwrap());
|
||||
}
|
||||
|
||||
if let Some(lmod) = self.last_modified() {
|
||||
req.headers_mut()
|
||||
.insert(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap());
|
||||
req = req.header(IF_MODIFIED_SINCE, HeaderValue::from_str(lmod).unwrap());
|
||||
}
|
||||
|
||||
client
|
||||
.request(req)
|
||||
req.send()
|
||||
.map_err(From::from)
|
||||
.and_then(move |res| self.match_status(res))
|
||||
}
|
||||
}
|
||||
|
||||
fn response_to_channel(
|
||||
res: Response<Body>,
|
||||
) -> impl Future<Item = Channel, Error = DataError> + Send {
|
||||
fn response_to_channel(res: Response) -> impl Future<Item = Channel, Error = DataError> + Send {
|
||||
res.into_body()
|
||||
.concat2()
|
||||
.map(|x| x.into_iter())
|
||||
@ -327,7 +274,6 @@ fn response_to_channel(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use failure::Error;
|
||||
use num_cpus;
|
||||
use tokio;
|
||||
|
||||
use crate::database::truncate_db;
|
||||
@ -338,8 +284,7 @@ mod tests {
|
||||
truncate_db()?;
|
||||
|
||||
let mut rt = tokio::runtime::Runtime::new()?;
|
||||
let https = HttpsConnector::new(num_cpus::get())?;
|
||||
let client = Client::builder().build::<_, Body>(https);
|
||||
let client = reqwest::r#async::Client::new();
|
||||
|
||||
let url = "https://web.archive.org/web/20180120083840if_/https://feeds.feedburner.\
|
||||
com/InterceptedWithJeremyScahill";
|
||||
|
||||
@ -23,26 +23,20 @@
|
||||
use futures::{future::ok, lazy, prelude::*, stream::FuturesUnordered};
|
||||
use tokio;
|
||||
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Body, Client};
|
||||
use hyper_tls::HttpsConnector;
|
||||
|
||||
use num_cpus;
|
||||
use reqwest::r#async::Client;
|
||||
|
||||
use crate::errors::DataError;
|
||||
use crate::Source;
|
||||
|
||||
use std::iter::FromIterator;
|
||||
|
||||
type HttpsClient = Client<HttpsConnector<HttpConnector>>;
|
||||
|
||||
/// The pipline to be run for indexing and updating a Podcast feed that originates from
|
||||
/// `Source.uri`.
|
||||
///
|
||||
/// Messy temp diagram:
|
||||
/// Source -> GET Request -> Update Etags -> Check Status -> Parse `xml/Rss` ->
|
||||
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
|
||||
pub fn pipeline<'a, S>(sources: S, client: HttpsClient) -> impl Future<Item = (), Error = ()> + 'a
|
||||
pub fn pipeline<'a, S>(sources: S, client: Client) -> impl Future<Item = (), Error = ()> + 'a
|
||||
where
|
||||
S: Stream<Item = Source, Error = DataError> + Send + 'a,
|
||||
{
|
||||
@ -67,14 +61,14 @@ where
|
||||
.for_each(move |_| ok(()))
|
||||
}
|
||||
|
||||
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and
|
||||
/// runs the pipeline to completion. The `reactor::Core` is dropped afterwards.
|
||||
/// Creates a `tokio::runtime::Runtime`, and a `reqwuest::async::Client` and
|
||||
/// runs the pipeline to completion. The Runtime and Client are dropped afterwards.
|
||||
/// FIXME: require the caller to handle the runtime and the client
|
||||
pub fn run<S>(sources: S) -> Result<(), DataError>
|
||||
where
|
||||
S: IntoIterator<Item = Source>,
|
||||
{
|
||||
let https = HttpsConnector::new(num_cpus::get())?;
|
||||
let client = Client::builder().build::<_, Body>(https);
|
||||
let client = Client::new();
|
||||
|
||||
let foo = sources.into_iter().map(ok::<_, _>);
|
||||
let stream = FuturesUnordered::from_iter(foo);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user