Pipeline: offload more stuff to the threadpool.

This commit is contained in:
Jordan Petridis 2018-01-23 11:43:37 +02:00
parent a8c49049dd
commit 050fe9c52b
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
2 changed files with 14 additions and 8 deletions

View File

@ -9,6 +9,7 @@ use hyper_tls::HttpsConnector;
// use futures::future::ok;
use futures::prelude::*;
use futures_cpupool::CpuPool;
use database::connection;
use errors::*;
@ -109,6 +110,7 @@ impl Source {
pub fn into_feed(
mut self,
client: &Client<HttpsConnector<HttpConnector>>,
pool: CpuPool,
ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = Error>> {
let id = self.id();
@ -122,7 +124,7 @@ impl Source {
match_status(res.status())?;
Ok(res)
})
.and_then(response_to_channel)
.and_then(move |res| response_to_channel(res, pool))
.map(move |chan| {
FeedBuilder::default()
.channel(chan)
@ -164,7 +166,10 @@ impl Source {
}
}
fn response_to_channel(res: Response) -> Box<Future<Item = Channel, Error = Error> + Send> {
fn response_to_channel(
res: Response,
pool: CpuPool,
) -> Box<Future<Item = Channel, Error = Error> + Send> {
let chan = res.body()
.concat2()
.map(|x| x.into_iter())
@ -172,8 +177,8 @@ fn response_to_channel(res: Response) -> Box<Future<Item = Channel, Error = Erro
.map(|iter| iter.collect::<Vec<u8>>())
.map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned())
.and_then(|buf| Channel::from_str(&buf).map_err(From::from));
Box::new(chan)
let cpu_chan = pool.spawn(chan);
Box::new(cpu_chan)
}
// TODO match on more stuff
@ -216,6 +221,7 @@ mod tests {
fn test_into_feed() {
truncate_db().unwrap();
let pool = CpuPool::new_num_cpus();
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(4, &core.handle()).unwrap())
@ -226,7 +232,7 @@ mod tests {
let source = Source::from_url(url).unwrap();
let id = source.id();
let feed = source.into_feed(&client, true);
let feed = source.into_feed(&client, pool.clone(), true);
let feed = core.run(feed).unwrap();
let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id);

View File

@ -44,7 +44,7 @@ macro_rules! clone {
/// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss ->
/// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes.
pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> {
let _pool = CpuPool::new_num_cpus();
let pool = CpuPool::new_num_cpus();
let mut core = Core::new()?;
let handle = core.handle();
@ -55,8 +55,8 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
let list = sources
.into_iter()
.map(|s| s.into_feed(&client, ignore_etags))
.map(|fut| fut.and_then(clone!(_pool => move |feed| _pool.clone().spawn(feed.index()))))
.map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags)))
.map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index()))))
.collect();
let f = core.run(collect_futures(list))?;