This works somehow...

This commit is contained in:
Jordan Petridis 2018-01-13 07:09:59 +02:00
parent ee9cede921
commit 6517956987
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
2 changed files with 72 additions and 79 deletions

View File

@ -6,13 +6,17 @@ use reqwest;
use diesel::SaveChangesDsl; use diesel::SaveChangesDsl;
use reqwest::header::{ETag, LastModified}; use reqwest::header::{ETag, LastModified};
use rss::Channel; use rss::Channel;
use hyper;
// use futures::{future, Future, Stream}; use hyper;
// use hyper::Client; use hyper::Client;
// use hyper::client::HttpConnector; use hyper::client::HttpConnector;
// use hyper::Method; use hyper::Method;
// use hyper::Uri; use hyper::Uri;
use hyper_tls::HttpsConnector;
// use hyper::header::{ETag, LastModified};
use futures::{Future, Stream};
// use futures::future::join_all;
use schema::{episode, podcast, source}; use schema::{episode, podcast, source};
use feed::Feed; use feed::Feed;
@ -728,8 +732,65 @@ impl<'a> Source {
Ok(Feed::from_channel_source(chan, self.id)) Ok(Feed::from_channel_source(chan, self.id))
} }
#[allow(dead_code)]
/// Docs
pub fn into_fututre_feed(
&'a mut self,
client: &'a mut Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = hyper::Error> + 'a> {
let id = *self.id();
let feed = request_constructor(&self, client, ignore_etags)
.map(move |res| {
println!("Status: {}", res.status());
self.update_etag2(&res).unwrap();
res
})
.and_then(move |res| response_to_channel(res))
.map(move |chan| Feed::from_channel_source(chan, id));
Box::new(feed)
}
/// Construct a new `Source` with the given `uri` and index it. /// Construct a new `Source` with the given `uri` and index it.
pub fn from_url(uri: &str) -> Result<Source> { pub fn from_url(uri: &str) -> Result<Source> {
NewSource::new_with_uri(uri).into_source() NewSource::new_with_uri(uri).into_source()
} }
} }
fn request_constructor(
s: &Source,
client: &mut Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = hyper::Response, Error = hyper::Error>> {
use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch};
let uri = Uri::from_str(&s.uri()).unwrap();
let mut req = hyper::Request::new(Method::Get, uri);
if !ignore_etags {
if let Some(foo) = s.http_etag() {
req.headers_mut().set(IfNoneMatch::Items(vec![
EntityTag::new(true, foo.to_owned()),
]));
}
if let Some(foo) = s.last_modified() {
if let Ok(x) = foo.parse::<HttpDate>() {
req.headers_mut().set(IfModifiedSince(x));
}
}
}
let work = client.request(req);
Box::new(work)
}
fn response_to_channel(res: hyper::Response) -> Box<Future<Item = Channel, Error = hyper::Error>> {
let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| {
let utf_8_bytes = iter.collect::<Vec<u8>>();
let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned();
Channel::from_str(&buf).unwrap()
});
Box::new(chan)
}

View File

@ -1,72 +1,12 @@
use rss;
use hyper;
use hyper::Client;
use hyper::client::HttpConnector;
use hyper::Method;
use hyper::Uri;
use hyper_tls::HttpsConnector;
// use hyper::header::{ETag, LastModified};
// use hyper::header::{ETag, LastModified};
use futures::{Future, Stream};
// use futures::future::join_all;
// use std::io::{self, Write};
use std::str::FromStr;
use Source;
// use errors::*;
#[allow(dead_code)]
fn request_constructor(
s: &Source,
client: &mut Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool,
) -> Box<Future<Item = hyper::Response, Error = hyper::Error>> {
use hyper::header::{EntityTag, HttpDate, IfModifiedSince, IfNoneMatch};
let uri = Uri::from_str(&s.uri()).unwrap();
let mut req = hyper::Request::new(Method::Get, uri);
if !ignore_etags {
if let Some(foo) = s.http_etag() {
req.headers_mut().set(IfNoneMatch::Items(vec![
EntityTag::new(true, foo.to_owned()),
]));
}
if let Some(foo) = s.last_modified() {
if let Ok(x) = foo.parse::<HttpDate>() {
req.headers_mut().set(IfModifiedSince(x));
}
}
}
let work = client.request(req);
Box::new(work)
}
#[allow(dead_code)]
fn res_to_channel(res: hyper::Response) -> Box<Future<Item = rss::Channel, Error = hyper::Error>> {
let chan = res.body().concat2().map(|x| x.into_iter()).map(|iter| {
let utf_8_bytes = iter.collect::<Vec<u8>>();
let buf = String::from_utf8_lossy(&utf_8_bytes).into_owned();
rss::Channel::from_str(&buf).unwrap()
});
// .map_err(|_| ());
Box::new(chan)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
// use futures::future::result; // use futures::future::result;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use hyper::Client;
use hyper_tls::HttpsConnector;
use database::truncate_db; use database::truncate_db;
use Source; use Source;
// use feed::Feed;
#[test] #[test]
fn test_bar() { fn test_bar() {
@ -80,18 +20,10 @@ mod tests {
let url = "https://feeds.feedburner.com/InterceptedWithJeremyScahill"; let url = "https://feeds.feedburner.com/InterceptedWithJeremyScahill";
let mut source = Source::from_url(url).unwrap(); let mut source = Source::from_url(url).unwrap();
let channel = request_constructor(&source, &mut client, false) let feed = source.into_fututre_feed(&mut client, false);
.map(|res| {
println!("Status: {}", res.status());
source.update_etag2(&res).unwrap();
res
})
.and_then(|res| res_to_channel(res));
// .map(|chan| Feed::from_channel_source(chan, source));
let chan = core.run(channel).unwrap(); let f = core.run(feed).unwrap();
// let c = chan.wait().unwrap(); println!("{:?}", f);
println!("{:?}", chan);
} }
} }