Pipeline: Use a custom tokio threadpool

This reverts commit e64883eecb
and 40dd2d6923

Seems like core.run() returns once its done even if there
are still tasks in the Runtime underneath. A way to solve that
would be to call the shutdown_on_idle method.

We need ownership of the threadpool in order to invoke
`shutdown_on_idle` method but core.runtime only returns a
referrence so we need to create our own threadpool.
This commit is contained in:
Jordan Petridis 2018-09-03 19:49:51 +03:00
parent fd77d672c5
commit dac303e33b
No known key found for this signature in database
GPG Key ID: E8523968931763BE
5 changed files with 29 additions and 12 deletions

3
Cargo.lock generated
View File

@ -1504,8 +1504,9 @@ dependencies = [
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -18,8 +18,9 @@ xdg = "2.1.0"
xml-rs = "0.8.0" xml-rs = "0.8.0"
futures = "0.1.23" futures = "0.1.23"
hyper = "0.11.27" hyper = "0.11.27"
tokio = "0.1.8"
tokio-core = "0.1.17" tokio-core = "0.1.17"
tokio-threadpool = "0.1.6"
tokio-executor = "0.1"
hyper-tls = "0.1.3" hyper-tls = "0.1.3"
native-tls = "0.1.5" native-tls = "0.1.5"
num_cpus = "1.8.0" num_cpus = "1.8.0"

View File

@ -4,6 +4,7 @@ use diesel_migrations::RunMigrationsError;
use hyper; use hyper;
use native_tls; use native_tls;
use rss; use rss;
use tokio_executor;
use url; use url;
use xml; use xml;
@ -46,6 +47,8 @@ pub enum DataError {
R2D2PoolError(#[cause] r2d2::PoolError), R2D2PoolError(#[cause] r2d2::PoolError),
#[fail(display = "Hyper Error: {}", _0)] #[fail(display = "Hyper Error: {}", _0)]
HyperError(#[cause] hyper::Error), HyperError(#[cause] hyper::Error),
#[fail(display = "Tokio Spawn Error: {}", _0)]
SpawnError(#[cause] tokio_executor::SpawnError),
#[fail(display = "Failed to parse a url: {}", _0)] #[fail(display = "Failed to parse a url: {}", _0)]
UrlError(#[cause] url::ParseError), UrlError(#[cause] url::ParseError),
#[fail(display = "TLS Error: {}", _0)] #[fail(display = "TLS Error: {}", _0)]
@ -101,6 +104,12 @@ impl From<hyper::Error> for DataError {
} }
} }
impl From<tokio_executor::SpawnError> for DataError {
fn from(err: tokio_executor::SpawnError) -> Self {
DataError::SpawnError(err)
}
}
impl From<url::ParseError> for DataError { impl From<url::ParseError> for DataError {
fn from(err: url::ParseError) -> Self { fn from(err: url::ParseError) -> Self {
DataError::UrlError(err) DataError::UrlError(err)

View File

@ -82,8 +82,9 @@ extern crate num_cpus;
extern crate rayon; extern crate rayon;
extern crate rfc822_sanitizer; extern crate rfc822_sanitizer;
extern crate rss; extern crate rss;
extern crate tokio;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_threadpool;
extern crate url; extern crate url;
extern crate xdg; extern crate xdg;
extern crate xml; extern crate xml;

View File

@ -1,14 +1,15 @@
// FIXME: // FIXME:
//! Docs. //! Docs.
use futures::future::*;
use futures::prelude::*; use futures::prelude::*;
use futures::{future::ok, lazy, stream::iter_ok}; use futures::stream::*;
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
use hyper::Client; use hyper::Client;
use hyper_tls::HttpsConnector; use hyper_tls::HttpsConnector;
use tokio::runtime::TaskExecutor;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_threadpool::{self, ThreadPool};
use num_cpus; use num_cpus;
@ -26,7 +27,7 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
pub fn pipeline<'a, S>( pub fn pipeline<'a, S>(
sources: S, sources: S,
client: HttpsClient, client: HttpsClient,
executor: TaskExecutor, pool: tokio_threadpool::Sender,
) -> impl Future<Item = Vec<()>, Error = DataError> + 'a ) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where where
S: Stream<Item = Source, Error = DataError> + 'a, S: Stream<Item = Source, Error = DataError> + 'a,
@ -34,9 +35,9 @@ where
sources sources
.and_then(move |s| s.into_feed(client.clone())) .and_then(move |s| s.into_feed(client.clone()))
.and_then(move |feed| { .and_then(move |feed| {
let fut = lazy(|| feed.index().map_err(|err| error!("Error: {}", err))); pool.spawn(lazy(|| {
executor.spawn(fut); feed.index().map_err(|err| error!("Error: {}", err))
Ok(()) })).map_err(From::from)
}) })
// the stream will stop at the first error so // the stream will stop at the first error so
// we ensure that everything will succeded regardless. // we ensure that everything will succeded regardless.
@ -51,16 +52,20 @@ pub fn run<S>(sources: S) -> Result<(), DataError>
where where
S: IntoIterator<Item = Source>, S: IntoIterator<Item = Source>,
{ {
let pool = ThreadPool::new();
let sender = pool.sender().clone();
let mut core = Core::new()?; let mut core = Core::new()?;
let executor = core.runtime().executor();
let handle = core.handle(); let handle = core.handle();
let client = Client::configure() let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &handle)?) .connector(HttpsConnector::new(num_cpus::get(), &handle)?)
.build(&handle); .build(&handle);
let stream = iter_ok::<_, DataError>(sources); let stream = iter_ok::<_, DataError>(sources);
let p = pipeline(stream, client, executor); let p = pipeline(stream, client, sender);
core.run(p).map(|_| ()) core.run(p)?;
pool.shutdown_on_idle().wait().unwrap();
Ok(())
} }
#[cfg(test)] #[cfg(test)]