diff --git a/Cargo.lock b/Cargo.lock index 0c3b1e1..3f3e44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1505,6 +1505,8 @@ dependencies = [ "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 845df5c..a96aa67 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -19,6 +19,8 @@ xml-rs = "0.8.0" futures = "0.1.23" hyper = "0.11.27" tokio-core = "0.1.17" +tokio-threadpool = "0.1.6" +tokio-executor = "0.1" hyper-tls = "0.1.3" native-tls = "0.1.5" num_cpus = "1.8.0" diff --git a/podcasts-data/src/errors.rs b/podcasts-data/src/errors.rs index 64b20c9..5e6e2ec 100644 --- a/podcasts-data/src/errors.rs +++ b/podcasts-data/src/errors.rs @@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError; use hyper; use native_tls; use rss; +use tokio_executor; use url; use xml; @@ -46,6 +47,8 @@ pub enum DataError { R2D2PoolError(#[cause] r2d2::PoolError), #[fail(display = "Hyper Error: {}", _0)] HyperError(#[cause] hyper::Error), + #[fail(display = "Tokio Spawn Error: {}", _0)] + SpawnError(#[cause] tokio_executor::SpawnError), #[fail(display = "Failed to parse a url: {}", _0)] UrlError(#[cause] url::ParseError), #[fail(display = "TLS Error: {}", _0)] @@ -101,6 +104,12 @@ impl From for DataError { } } +impl From for DataError { + fn from(err: tokio_executor::SpawnError) -> Self { + DataError::SpawnError(err) + } +} + impl From for DataError { fn from(err: url::ParseError) -> Self { DataError::UrlError(err) diff --git a/podcasts-data/src/lib.rs b/podcasts-data/src/lib.rs index b0f4a48..3654627 100644 --- a/podcasts-data/src/lib.rs +++ b/podcasts-data/src/lib.rs @@ -83,6 +83,8 @@ extern crate rayon; extern crate rfc822_sanitizer; extern crate rss; extern crate tokio_core; +extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; extern crate xdg; extern crate xml; diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 98e224e..6048027 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -9,6 +9,7 @@ use hyper::client::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; use tokio_core::reactor::Core; +use tokio_threadpool::{self, ThreadPool}; use num_cpus; @@ -47,13 +48,18 @@ type HttpsClient = Client>; pub fn pipeline<'a, S>( sources: S, client: &HttpsClient, + pool: &tokio_threadpool::Sender, ) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { sources .and_then(clone!(client => move |s| s.into_feed(client.clone()))) - .and_then(|feed| feed.index()) + .and_then(clone!(pool => move |feed| { + pool.spawn(lazy(|| { + feed.index().map_err(|err| error!("Error: {}", err)) + })).map_err(From::from) + })) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) @@ -67,6 +73,8 @@ pub fn run(sources: S) -> Result<(), DataError> where S: IntoIterator, { + let pool = ThreadPool::new(); + let sender = pool.sender(); let mut core = Core::new()?; let handle = core.handle(); let client = Client::configure() @@ -74,7 +82,7 @@ where .build(&handle); let stream = iter_ok::<_, DataError>(sources); - let p = pipeline(stream, &client); + let p = pipeline(stream, &client, &sender); core.run(p).map(|_| ()) }