diff --git a/hammond-data/src/lib.rs b/hammond-data/src/lib.rs index 96a190d..047d265 100644 --- a/hammond-data/src/lib.rs +++ b/hammond-data/src/lib.rs @@ -19,7 +19,8 @@ unused_parens, while_true)] #![deny(missing_debug_implementations, missing_docs, trivial_casts, trivial_numeric_casts)] // FIXME: uncomment -// unused_extern_crates, unused)] +// #![deny(unused_extern_crates, unused)] + // #![feature(conservative_impl_trait)] #[macro_use] @@ -59,15 +60,15 @@ extern crate xdg; #[allow(missing_docs)] pub mod dbqueries; -pub mod utils; -pub mod feed; #[allow(missing_docs)] pub mod errors; +pub mod utils; +pub mod feed; pub mod database; +pub mod pipeline; pub(crate) mod models; mod parser; mod schema; -pub mod pipeline; pub use models::queryables::{Episode, EpisodeWidgetQuery, Podcast, PodcastCoverQuery, Source}; diff --git a/hammond-data/src/models/insertables.rs b/hammond-data/src/models/insertables.rs index e66f54f..01d2e10 100644 --- a/hammond-data/src/models/insertables.rs +++ b/hammond-data/src/models/insertables.rs @@ -39,7 +39,7 @@ impl Insert for NewSource { } impl NewSource { - pub(crate) fn new_with_uri(uri: &str) -> NewSource { + pub(crate) fn new(uri: &str) -> NewSource { NewSource { uri: uri.trim().to_string(), last_modified: None, diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index b679d03..b8eef3c 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -4,16 +4,12 @@ use diesel; use reqwest; use diesel::SaveChangesDsl; -use reqwest::header::{ETag, LastModified}; use rss::Channel; -use hyper; -use hyper::Client; use hyper::client::HttpConnector; -use hyper::Method; -use hyper::Uri; +use hyper::{Client, Method, Request, Response, StatusCode, Uri}; +use hyper::header::{ETag, EntityTag, HttpDate, IfModifiedSince, IfNoneMatch, LastModified}; use hyper_tls::HttpsConnector; -// use hyper::header::{ETag, LastModified}; use futures::prelude::*; // use futures::future::ok; @@ -639,10 +635,10 @@ impl Source { Ok(self.save_changes::(&*tempdb)?) } - /// Extract Etag and LastModifier from req, and update self and the + /// Extract Etag and LastModifier from res, and update self and the /// corresponding db row. - fn update_etag(&mut self, req: &reqwest::Response) -> Result<()> { - let headers = req.headers(); + fn update_etag(&mut self, res: &reqwest::Response) -> Result<()> { + let headers = res.headers(); let etag = headers.get::(); let lmod = headers.get::(); @@ -658,9 +654,10 @@ impl Source { Ok(()) } - /// Docs - pub fn update_etag2(mut self, req: &hyper::Response) -> Result<()> { - let headers = req.headers(); + /// Extract Etag and LastModifier from res, and update self and the + /// corresponding db row. + fn update_etag2(mut self, res: &Response) -> Result<()> { + let headers = res.headers(); let etag = headers.get::(); let lmod = headers.get::(); @@ -686,7 +683,6 @@ impl Source { // TODO: Refactor into TryInto once it lands on stable. pub fn into_feed(&mut self, ignore_etags: bool) -> Result { use reqwest::header::{EntityTag, Headers, HttpDate, IfModifiedSince, IfNoneMatch}; - use reqwest::StatusCode; let mut headers = Headers::new(); @@ -705,44 +701,20 @@ impl Source { } let client = reqwest::Client::builder().referer(false).build()?; - let mut req = client.get(self.uri()).headers(headers).send()?; + let mut res = client.get(self.uri()).headers(headers).send()?; - info!("GET to {} , returned: {}", self.uri(), req.status()); + info!("GET to {} , returned: {}", self.uri(), res.status()); - self.update_etag(&req)?; - - // TODO match on more stuff - // 301: Moved Permanently - // 304: Up to date Feed, checked with the Etag - // 307: Temporary redirect of the url - // 308: Permanent redirect of the url - // 401: Unathorized - // 403: Forbidden - // 408: Timeout - // 410: Feed deleted - match req.status() { - StatusCode::NotModified => bail!("304: skipping.."), - StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."), - // TODO: Change the source uri to the new one - StatusCode::MovedPermanently | StatusCode::PermanentRedirect => { - warn!("Feed was moved permanently.") - } - StatusCode::Unauthorized => bail!("401: Unauthorized."), - StatusCode::Forbidden => bail!("403: Forbidden."), - StatusCode::NotFound => bail!("404: Not found."), - StatusCode::RequestTimeout => bail!("408: Request Timeout."), - StatusCode::Gone => bail!("410: Feed was deleted."), - _ => (), - }; + self.update_etag(&res)?; + match_status(res.status())?; let mut buf = String::new(); - req.read_to_string(&mut buf)?; + res.read_to_string(&mut buf)?; let chan = Channel::from_str(&buf)?; Ok(Feed::from_channel_source(chan, self.id)) } - #[allow(dead_code)] /// Docs pub fn into_fututre_feed( self, @@ -750,6 +722,8 @@ impl Source { ignore_etags: bool, ) -> Box> { let id = self.id(); + // TODO: make URI future + // TODO: make a status match future let feed = request_constructor(&self, client, ignore_etags) .map(move |res| { if let Err(err) = self.update_etag2(&res) { @@ -766,21 +740,23 @@ impl Source { } /// Construct a new `Source` with the given `uri` and index it. + /// + /// This only indexes the `Source` struct, not the Podcast Feed. pub fn from_url(uri: &str) -> Result { - NewSource::new_with_uri(uri).into_source() + NewSource::new(uri).into_source() } } +// TODO: make ignore_etags an Enum for better ergonomics. +// #bools_are_just_2variant_enmus fn request_constructor( s: &Source, client: &Client>, ignore_etags: bool, -) -> Box> { - use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch}; - +) -> Box> { // FIXME: remove unwrap let uri = Uri::from_str(&s.uri()).unwrap(); - let mut req = hyper::Request::new(Method::Get, uri); + let mut req = Request::new(Method::Get, uri); if !ignore_etags { if let Some(foo) = s.http_etag() { @@ -800,7 +776,7 @@ fn request_constructor( Box::new(work) } -fn response_to_channel(res: hyper::Response) -> Box> { +fn response_to_channel(res: Response) -> Box> { let chan = res.body() .concat2() .map(|x| x.into_iter()) @@ -814,6 +790,33 @@ fn response_to_channel(res: hyper::Response) -> Box Result<()> { + match code { + StatusCode::NotModified => bail!("304: skipping.."), + StatusCode::TemporaryRedirect => debug!("307: Temporary Redirect."), + // TODO: Change the source uri to the new one + StatusCode::MovedPermanently | StatusCode::PermanentRedirect => { + warn!("Feed was moved permanently.") + } + StatusCode::Unauthorized => bail!("401: Unauthorized."), + StatusCode::Forbidden => bail!("403: Forbidden."), + StatusCode::NotFound => bail!("404: Not found."), + StatusCode::RequestTimeout => bail!("408: Request Timeout."), + StatusCode::Gone => bail!("410: Feed was deleted."), + _ => (), + }; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index d2cb80b..bf51449 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -1,13 +1,13 @@ //! Docs. -use futures::future::*; use errors::Error; use Source; use tokio_core::reactor::Core; use hyper::Client; use hyper_tls::HttpsConnector; -// use futures::future::*; +use futures::prelude::*; +use futures::future::*; // Weird magic from #rust irc channel // kudos to remexre @@ -42,8 +42,13 @@ mod dirtyhack { use super::*; use errors::*; - /// Docs - pub fn pipeline>(sources: S) -> Result<()> { + /// The pipline to be run for indexing and updating a Podcast feed that originates from + /// `Source.uri`. + /// + /// Messy temp diagram: + /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> + /// Convert rss::Channel into Feed -> Index Podcast -> Index Episodes. + pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() @@ -53,7 +58,9 @@ mod dirtyhack { let list = sources .into_iter() - .map(|s| s.into_fututre_feed(&client, false).map(|feed| feed.index())) + // FIXME: Make proper indexing futures instead of wrapping up existing + // blocking functions + .map(|s| s.into_fututre_feed(&client, ignore_etags).map(|feed| feed.index())) .collect(); let f = core.run(collect_futures(list))?; diff --git a/hammond-gtk/src/utils.rs b/hammond-gtk/src/utils.rs index 74c1446..207cb79 100644 --- a/hammond-gtk/src/utils.rs +++ b/hammond-gtk/src/utils.rs @@ -19,18 +19,22 @@ use app::Action; /// If `source` is None, Fetches all the `Source` entries in the database and updates them. /// When It's done,it queues up a `RefreshViews` action. pub fn refresh_feed(headerbar: Arc
, source: Option>, sender: Sender) { + // TODO: make it an application channel action. + // I missed it before apparently. headerbar.show_update_notification(); thread::spawn(move || { + // FIXME: This is messy at best. if let Some(s) = source { // feed::index_loop(s); - if let Err(err) = pipeline::pipeline(s) { + // TODO: determine if it needs to ignore_etags. + if let Err(err) = pipeline::pipeline(s, true) { error!("Error While trying to update the database."); error!("Error msg: {}", err); } } else { let sources = dbqueries::get_sources().unwrap(); - if let Err(err) = pipeline::pipeline(sources) { + if let Err(err) = pipeline::pipeline(sources, false) { error!("Error While trying to update the database."); error!("Error msg: {}", err); }