diff --git a/hammond-data/src/models/queryables.rs b/hammond-data/src/models/queryables.rs index 2557e88..863d51a 100644 --- a/hammond-data/src/models/queryables.rs +++ b/hammond-data/src/models/queryables.rs @@ -6,13 +6,17 @@ use reqwest; use diesel::SaveChangesDsl; use reqwest::header::{ETag, LastModified}; use rss::Channel; -use hyper; -// use futures::{future, Future, Stream}; -// use hyper::Client; -// use hyper::client::HttpConnector; -// use hyper::Method; -// use hyper::Uri; +use hyper; +use hyper::Client; +use hyper::client::HttpConnector; +use hyper::Method; +use hyper::Uri; +use hyper_tls::HttpsConnector; +// use hyper::header::{ETag, LastModified}; + +use futures::{Future, Stream}; +// use futures::future::join_all; use schema::{episode, podcast, source}; use feed::Feed; @@ -728,8 +732,65 @@ impl<'a> Source { Ok(Feed::from_channel_source(chan, self.id)) } + #[allow(dead_code)] + /// Docs + pub fn into_fututre_feed( + &'a mut self, + client: &'a mut Client>, + ignore_etags: bool, + ) -> Box + 'a> { + let id = *self.id(); + let feed = request_constructor(&self, client, ignore_etags) + .map(move |res| { + println!("Status: {}", res.status()); + self.update_etag2(&res).unwrap(); + res + }) + .and_then(move |res| response_to_channel(res)) + .map(move |chan| Feed::from_channel_source(chan, id)); + + Box::new(feed) + } + /// Construct a new `Source` with the given `uri` and index it. pub fn from_url(uri: &str) -> Result { NewSource::new_with_uri(uri).into_source() } } + +fn request_constructor( + s: &Source, + client: &mut Client>, + ignore_etags: bool, +) -> Box> { + use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch}; + + let uri = Uri::from_str(&s.uri()).unwrap(); + let mut req = hyper::Request::new(Method::Get, uri); + + if !ignore_etags { + if let Some(foo) = s.http_etag() { + req.headers_mut().set(IfNoneMatch::Items(vec![ + EntityTag::new(true, foo.to_owned()), + ])); + } + + if let Some(foo) = s.last_modified() { + if let Ok(x) = foo.parse::() { + req.headers_mut().set(IfModifiedSince(x)); + } + } + } + + let work = client.request(req); + Box::new(work) +} + +fn response_to_channel(res: hyper::Response) -> Box> { + let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| { + let utf_8_bytes = iter.collect::>(); + let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned(); + Channel::from_str(&buf).unwrap() + }); + Box::new(chan) +} diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index ebc6367..b9d4dab 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -1,72 +1,12 @@ -use rss; - -use hyper; -use hyper::Client; -use hyper::client::HttpConnector; -use hyper::Method; -use hyper::Uri; -use hyper_tls::HttpsConnector; -// use hyper::header::{ETag, LastModified}; -// use hyper::header::{ETag, LastModified}; - -use futures::{Future, Stream}; -// use futures::future::join_all; - -// use std::io::{self, Write}; -use std::str::FromStr; - -use Source; -// use errors::*; - -#[allow(dead_code)] -fn request_constructor( - s: &Source, - client: &mut Client>, - ignore_etags: bool, -) -> Box> { - use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch}; - - let uri = Uri::from_str(&s.uri()).unwrap(); - let mut req = hyper::Request::new(Method::Get, uri); - - if !ignore_etags { - if let Some(foo) = s.http_etag() { - req.headers_mut().set(IfNoneMatch::Items(vec![ - EntityTag::new(true, foo.to_owned()), - ])); - } - - if let Some(foo) = s.last_modified() { - if let Ok(x) = foo.parse::() { - req.headers_mut().set(IfModifiedSince(x)); - } - } - } - - let work = client.request(req); - Box::new(work) -} - -#[allow(dead_code)] -fn res_to_channel(res: hyper::Response) -> Box> { - let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| { - let utf_8_bytes = iter.collect::>(); - let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned(); - rss::Channel::from_str(&buf).unwrap() - }); - // .map_err(|_| ()); - Box::new(chan) -} - #[cfg(test)] mod tests { - use super::*; // use futures::future::result; use tokio_core::reactor::Core; + use hyper::Client; + use hyper_tls::HttpsConnector; use database::truncate_db; use Source; - // use feed::Feed; #[test] fn test_bar() { @@ -80,18 +20,10 @@ mod tests { let url = "https://feeds.feedburner.com/InterceptedWithJeremyScahill"; let mut source = Source::from_url(url).unwrap(); - let channel = request_constructor(&source, &mut client, false) - .map(|res| { - println!("Status: {}", res.status()); - source.update_etag2(&res).unwrap(); - res - }) - .and_then(|res| res_to_channel(res)); - // .map(|chan| Feed::from_channel_source(chan, source)); + let feed = source.into_fututre_feed(&mut client, false); - let chan = core.run(channel).unwrap(); + let f = core.run(feed).unwrap(); - // let c = chan.wait().unwrap(); - println!("{:?}", chan); + println!("{:?}", f); } }