From 050fe9c52bff361cb1ab2e3becc193e535c3d2ce Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Tue, 23 Jan 2018 11:43:37 +0200 Subject: [PATCH] Pipeline: offload more stuff to the threadpool. --- hammond-data/src/models/source.rs | 16 +++++++++++----- hammond-data/src/pipeline.rs | 6 +++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hammond-data/src/models/source.rs b/hammond-data/src/models/source.rs index 89d6c72..c2bdd76 100644 --- a/hammond-data/src/models/source.rs +++ b/hammond-data/src/models/source.rs @@ -9,6 +9,7 @@ use hyper_tls::HttpsConnector; // use futures::future::ok; use futures::prelude::*; +use futures_cpupool::CpuPool; use database::connection; use errors::*; @@ -109,6 +110,7 @@ impl Source { pub fn into_feed( mut self, client: &Client>, + pool: CpuPool, ignore_etags: bool, ) -> Box> { let id = self.id(); @@ -122,7 +124,7 @@ impl Source { match_status(res.status())?; Ok(res) }) - .and_then(response_to_channel) + .and_then(move |res| response_to_channel(res, pool)) .map(move |chan| { FeedBuilder::default() .channel(chan) @@ -164,7 +166,10 @@ impl Source { } } -fn response_to_channel(res: Response) -> Box + Send> { +fn response_to_channel( + res: Response, + pool: CpuPool, +) -> Box + Send> { let chan = res.body() .concat2() .map(|x| x.into_iter()) @@ -172,8 +177,8 @@ fn response_to_channel(res: Response) -> Box>()) .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) .and_then(|buf| Channel::from_str(&buf).map_err(From::from)); - - Box::new(chan) + let cpu_chan = pool.spawn(chan); + Box::new(cpu_chan) } // TODO match on more stuff @@ -216,6 +221,7 @@ mod tests { fn test_into_feed() { truncate_db().unwrap(); + let pool = CpuPool::new_num_cpus(); let mut core = Core::new().unwrap(); let client = Client::configure() .connector(HttpsConnector::new(4, &core.handle()).unwrap()) @@ -226,7 +232,7 @@ mod tests { let source = Source::from_url(url).unwrap(); let id = source.id(); - let feed = source.into_feed(&client, true); + let feed = source.into_feed(&client, pool.clone(), true); let feed = core.run(feed).unwrap(); let expected = get_feed("tests/feeds/2018-01-20-Intercepted.xml", id); diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index 694b9dd..c1fee9a 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -44,7 +44,7 @@ macro_rules! clone { /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> /// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes. pub fn pipeline>(sources: S, ignore_etags: bool) -> Result<()> { - let _pool = CpuPool::new_num_cpus(); + let pool = CpuPool::new_num_cpus(); let mut core = Core::new()?; let handle = core.handle(); @@ -55,8 +55,8 @@ pub fn pipeline>(sources: S, ignore_etags: bool) let list = sources .into_iter() - .map(|s| s.into_feed(&client, ignore_etags)) - .map(|fut| fut.and_then(clone!(_pool => move |feed| _pool.clone().spawn(feed.index())))) + .map(clone!(pool => move |s| s.into_feed(&client, pool.clone(), ignore_etags))) + .map(|fut| fut.and_then(clone!(pool => move |feed| pool.clone().spawn(feed.index())))) .collect(); let f = core.run(collect_futures(list))?;