This sort of works?

This commit is contained in:
Jordan Petridis 2018-01-23 10:12:32 +02:00
parent f7f6087d70
commit 8095919bbe
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
6 changed files with 25 additions and 5 deletions

1
Cargo.lock generated
View File

@ -618,6 +618,7 @@ dependencies = [
"dotenv 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.14 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.14 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -24,6 +24,7 @@ hyper = "0.11.14"
tokio-core = "0.1.12" tokio-core = "0.1.12"
hyper-tls = "0.1.2" hyper-tls = "0.1.2"
native-tls = "0.1.5" native-tls = "0.1.5"
futures-cpupool = "0.1.8"
[dependencies.diesel] [dependencies.diesel]
features = ["sqlite", "r2d2"] features = ["sqlite", "r2d2"]

View File

@ -59,7 +59,7 @@ static URLS: &[(&[u8], &str)] = &[
), ),
]; ];
fn index_urls() -> Vec<Box<Future<Item = (), Error = Error>>> { fn index_urls() -> Vec<Box<Future<Item = (), Error = Error> + Send>> {
let feeds: Vec<_> = URLS.iter() let feeds: Vec<_> = URLS.iter()
.map(|&(buff, url)| { .map(|&(buff, url)| {
// Create and insert a Source into db // Create and insert a Source into db

View File

@ -53,6 +53,7 @@ impl Feed {
}) })
.map(|(_, update)| { .map(|(_, update)| {
if !update.is_empty() { if !update.is_empty() {
info!("Updating {} episodes.", update.len());
// see get_stuff for more // see get_stuff for more
update update
.into_iter() .into_iter()

View File

@ -39,7 +39,7 @@ extern crate log;
extern crate ammonia; extern crate ammonia;
extern crate chrono; extern crate chrono;
extern crate futures; extern crate futures;
// extern crate futures_cpupool; extern crate futures_cpupool;
extern crate hyper; extern crate hyper;
extern crate hyper_tls; extern crate hyper_tls;
extern crate itertools; extern crate itertools;

View File

@ -2,7 +2,7 @@
//! Docs. //! Docs.
use futures::future::*; use futures::future::*;
// use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
// use futures::prelude::*; // use futures::prelude::*;
use hyper::Client; use hyper::Client;
@ -20,6 +20,23 @@ use models::{IndexState, NewEpisode, NewEpisodeMinimal};
use std; use std;
// use std::sync::{Arc, Mutex}; // use std::sync::{Arc, Mutex};
macro_rules! clone {
(@param _) => ( _ );
(@param $x:ident) => ( $x );
($($n:ident),+ => move || $body:expr) => (
{
$( let $n = $n.clone(); )+
move || $body
}
);
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
{
$( let $n = $n.clone(); )+
move |$(clone!(@param $p),)+| $body
}
);
}
/// The pipline to be run for indexing and updating a Podcast feed that originates from /// The pipline to be run for indexing and updating a Podcast feed that originates from
/// `Source.uri`. /// `Source.uri`.
/// ///
@ -27,7 +44,7 @@ use std;
/// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss -> /// Source -> GET Request -> Update Etags -> Check Status -> Parse xml/Rss ->
/// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes. /// Convert `rss::Channel` into Feed -> Index Podcast -> Index Episodes.
pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> { pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool) -> Result<()> {
// let _pool = CpuPool::new_num_cpus(); let _pool = CpuPool::new_num_cpus();
let mut core = Core::new()?; let mut core = Core::new()?;
let handle = core.handle(); let handle = core.handle();
@ -39,7 +56,7 @@ pub fn pipeline<S: IntoIterator<Item = Source>>(sources: S, ignore_etags: bool)
let list = sources let list = sources
.into_iter() .into_iter()
.map(|s| s.into_feed(&client, ignore_etags)) .map(|s| s.into_feed(&client, ignore_etags))
.map(|fut| fut.and_then(|feed| feed.index())) .map(|fut| fut.and_then(clone!(_pool => move |feed| _pool.clone().spawn(feed.index()))))
.collect(); .collect();
let f = core.run(collect_futures(list))?; let f = core.run(collect_futures(list))?;