diff --git a/Cargo.lock b/Cargo.lock index a586c06..c4d3dc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,7 +1014,7 @@ dependencies = [ "indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1156,7 +1156,7 @@ dependencies = [ "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1181,7 +1181,7 @@ dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1788,7 +1788,7 @@ dependencies = [ "rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2503,7 +2503,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2512,6 +2512,7 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2627,7 +2628,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2640,7 +2641,7 @@ dependencies = [ "futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3099,7 +3100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" -"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" +"checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" "checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" "checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab" diff --git a/podcasts-data/Cargo.toml b/podcasts-data/Cargo.toml index 7063127..7e282b2 100644 --- a/podcasts-data/Cargo.toml +++ b/podcasts-data/Cargo.toml @@ -19,7 +19,6 @@ xml-rs = "0.8.0" futures = "0.3.4" hyper = "0.13.2" http = "0.2.0" -tokio = "0.2.11" hyper-tls = "0.4.1" native-tls = "0.2.3" num_cpus = "1.10.1" @@ -35,6 +34,10 @@ version = "1.4.3" features = ["sqlite"] version = "1.4.0" +[dependencies.tokio] +features = ["rt-core", "rt-threaded"] +version = "0.2.13" + [dev-dependencies] rand = "0.7.2" tempdir = "0.3.7" diff --git a/podcasts-data/src/feed.rs b/podcasts-data/src/feed.rs index d4cffc9..1920c58 100644 --- a/podcasts-data/src/feed.rs +++ b/podcasts-data/src/feed.rs @@ -45,7 +45,7 @@ pub struct Feed { impl Feed { /// Index the contents of the RSS `Feed` into the database. - pub async fn index(self) -> Result<(), DataError>{ + pub async fn index(self) -> Result<(), DataError> { let show = self.parse_podcast().to_podcast()?; self.index_channel_items(show).await } @@ -58,16 +58,14 @@ impl Feed { let stream = stream::iter(self.channel.into_items()); // Parse the episodes let episodes = stream.filter_map(move |item| { - let ret = NewEpisodeMinimal::new(&item, pd.id()) + let ret = NewEpisodeMinimal::new(&item, pd.id()) .and_then(move |ep| determine_ep_state(ep, &item)); if ret.is_ok() { future::ready(Some(ret)) - } - else { + } else { future::ready(None) - } - }); - + } + }); // Filter errors, Index updatable episodes, return insertables. filter_episodes(episodes) // Batch index insertable episodes. @@ -103,23 +101,25 @@ where S: Stream, DataError>> + Send + 'a, { stream - .try_filter_map(|state| async move { - match state { - IndexState::NotChanged => Ok(None), - // Update individual rows, and filter them - IndexState::Update((ref ep, rowid)) => { - ep.update(rowid) - .map_err(|err| error!("{}", err)) - .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) - .ok(); + .try_filter_map(|state| { + async move { + match state { + IndexState::NotChanged => Ok(None), + // Update individual rows, and filter them + IndexState::Update((ref ep, rowid)) => { + ep.update(rowid) + .map_err(|err| error!("{}", err)) + .map_err(|_| error!("Failed to index episode: {:?}.", ep.title())) + .ok(); - Ok(None) + Ok(None) + } + IndexState::Index(s) => Ok(Some(s)), } - IndexState::Index(s) => Ok(Some(s)), } }) // only Index is left, collect them for batch index - .try_collect() + .try_collect() } fn batch_insert_episodes(episodes: &[NewEpisode]) { @@ -146,9 +146,9 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) { #[cfg(test)] mod tests { use failure::Error; + use futures::executor::block_on; use rss::Channel; use tokio; - use futures::executor::block_on; use crate::database::truncate_db; use crate::dbqueries; diff --git a/podcasts-data/src/models/source.rs b/podcasts-data/src/models/source.rs index a1c29fa..63d9ed4 100644 --- a/podcasts-data/src/models/source.rs +++ b/podcasts-data/src/models/source.rs @@ -159,7 +159,7 @@ impl Source { let code = res.status(); if code.is_success() { - // If request is succesful save the etag + // If request is successful save the etag self = self.update_etag(&res)? } else { match code.as_u16() { @@ -189,7 +189,7 @@ impl Source { return Err(DataError::FeedRedirect(self)); } 401 => return Err(self.make_err("401: Unauthorized.", code)), - 403 => return Err(self.make_err("403: Forbidden.", code)), + 403 => return Err(self.make_err("403: Forbidden.", code)), 404 => return Err(self.make_err("404: Not found.", code)), 408 => return Err(self.make_err("408: Request Timeout.", code)), 410 => return Err(self.make_err("410: Feed was deleted..", code)), @@ -240,23 +240,24 @@ impl Source { self, client: Client>, ) -> Result { - let id = self.id(); let resp = self.get_response(&client).await?; let chan = response_to_channel(resp).await?; - + FeedBuilder::default() .channel(chan) .source_id(id) .build() - .map_err(From::from) + .map_err(From::from) } - async fn get_response(self, client: &Client>) -> Result, DataError> { + async fn get_response( + self, + client: &Client>, + ) -> Result, DataError> { let mut source = self; - loop - { + loop { match source.request_constructor(&client.clone()).await { Ok(response) => return Ok(response), Err(err) => match err { @@ -267,7 +268,7 @@ impl Source { e => return Err(e), }, } - }; + } } async fn request_constructor( @@ -305,14 +306,12 @@ impl Source { } let res = client.request(req).await?; - //.map_err(From::from) + //.map_err(From::from) self.match_status(res) } } -async fn response_to_channel( - res: Response, -) -> Result { +async fn response_to_channel(res: Response) -> Result { let chunk = hyper::body::to_bytes(res.into_body()).await?; let buf = String::from_utf8_lossy(&chunk).into_owned(); Channel::from_str(&buf).map_err(From::from) diff --git a/podcasts-data/src/pipeline.rs b/podcasts-data/src/pipeline.rs index 81d7d46..9129ea3 100644 --- a/podcasts-data/src/pipeline.rs +++ b/podcasts-data/src/pipeline.rs @@ -20,7 +20,7 @@ // FIXME: //! Docs. -use futures::{future::ok, future::lazy, prelude::*, stream::FuturesUnordered}; +use futures::{future::ok, prelude::*, stream::FuturesUnordered}; use tokio; use hyper::client::HttpConnector; @@ -42,18 +42,18 @@ type HttpsClient = Client>; /// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes. pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient) where - S: Stream> + Send + 'a + std::marker::Unpin + S: Stream> + Send + 'a + std::marker::Unpin, { while let Some(source_result) = sources.next().await { if let Ok(source) = source_result { match source.into_feed(client.clone()).await { Ok(feed) => { - let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err))); + let fut = feed.index().map_err(|err| error!("Error: {}", err)); tokio::spawn(fut); - }, + } // Avoid spamming the stderr when it's not an actual error Err(DataError::FeedNotModified(_)) => (), - Err(err) => error!("Error: {}", err), + Err(err) => error!("Error: {}", err), }; } }