Add tokio features and remove lazy keyword

This commit is contained in:
Julian Hofer 2020-03-01 18:55:05 +01:00 committed by Jordan Petridis
parent e830589e38
commit 636e2aefde
5 changed files with 50 additions and 47 deletions

17
Cargo.lock generated
View File

@ -1014,7 +1014,7 @@ dependencies = [
"indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1156,7 +1156,7 @@ dependencies = [
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1181,7 +1181,7 @@ dependencies = [
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)",
"native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1788,7 +1788,7 @@ dependencies = [
"rfc822_sanitizer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2503,7 +2503,7 @@ dependencies = [
[[package]]
name = "tokio"
version = "0.2.11"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2512,6 +2512,7 @@ dependencies = [
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2627,7 +2628,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -2640,7 +2641,7 @@ dependencies = [
"futures-sink 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -3099,7 +3100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
"checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"
"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b"
"checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616"
"checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46"
"checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443"
"checksum tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab"

View File

@ -19,7 +19,6 @@ xml-rs = "0.8.0"
futures = "0.3.4"
hyper = "0.13.2"
http = "0.2.0"
tokio = "0.2.11"
hyper-tls = "0.4.1"
native-tls = "0.2.3"
num_cpus = "1.10.1"
@ -35,6 +34,10 @@ version = "1.4.3"
features = ["sqlite"]
version = "1.4.0"
[dependencies.tokio]
features = ["rt-core", "rt-threaded"]
version = "0.2.13"
[dev-dependencies]
rand = "0.7.2"
tempdir = "0.3.7"

View File

@ -45,7 +45,7 @@ pub struct Feed {
impl Feed {
/// Index the contents of the RSS `Feed` into the database.
pub async fn index(self) -> Result<(), DataError>{
pub async fn index(self) -> Result<(), DataError> {
let show = self.parse_podcast().to_podcast()?;
self.index_channel_items(show).await
}
@ -58,16 +58,14 @@ impl Feed {
let stream = stream::iter(self.channel.into_items());
// Parse the episodes
let episodes = stream.filter_map(move |item| {
let ret = NewEpisodeMinimal::new(&item, pd.id())
let ret = NewEpisodeMinimal::new(&item, pd.id())
.and_then(move |ep| determine_ep_state(ep, &item));
if ret.is_ok() {
future::ready(Some(ret))
}
else {
} else {
future::ready(None)
}
});
}
});
// Filter errors, Index updatable episodes, return insertables.
filter_episodes(episodes)
// Batch index insertable episodes.
@ -103,23 +101,25 @@ where
S: Stream<Item = Result<IndexState<NewEpisode>, DataError>> + Send + 'a,
{
stream
.try_filter_map(|state| async move {
match state {
IndexState::NotChanged => Ok(None),
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
.try_filter_map(|state| {
async move {
match state {
IndexState::NotChanged => Ok(None),
// Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => {
ep.update(rowid)
.map_err(|err| error!("{}", err))
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
.ok();
Ok(None)
Ok(None)
}
IndexState::Index(s) => Ok(Some(s)),
}
IndexState::Index(s) => Ok(Some(s)),
}
})
// only Index is left, collect them for batch index
.try_collect()
.try_collect()
}
fn batch_insert_episodes(episodes: &[NewEpisode]) {
@ -146,9 +146,9 @@ fn batch_insert_episodes(episodes: &[NewEpisode]) {
#[cfg(test)]
mod tests {
use failure::Error;
use futures::executor::block_on;
use rss::Channel;
use tokio;
use futures::executor::block_on;
use crate::database::truncate_db;
use crate::dbqueries;

View File

@ -159,7 +159,7 @@ impl Source {
let code = res.status();
if code.is_success() {
// If request is succesful save the etag
// If request is successful save the etag
self = self.update_etag(&res)?
} else {
match code.as_u16() {
@ -189,7 +189,7 @@ impl Source {
return Err(DataError::FeedRedirect(self));
}
401 => return Err(self.make_err("401: Unauthorized.", code)),
403 => return Err(self.make_err("403: Forbidden.", code)),
403 => return Err(self.make_err("403: Forbidden.", code)),
404 => return Err(self.make_err("404: Not found.", code)),
408 => return Err(self.make_err("408: Request Timeout.", code)),
410 => return Err(self.make_err("410: Feed was deleted..", code)),
@ -240,23 +240,24 @@ impl Source {
self,
client: Client<HttpsConnector<HttpConnector>>,
) -> Result<Feed, DataError> {
let id = self.id();
let resp = self.get_response(&client).await?;
let chan = response_to_channel(resp).await?;
FeedBuilder::default()
.channel(chan)
.source_id(id)
.build()
.map_err(From::from)
.map_err(From::from)
}
async fn get_response(self, client: &Client<HttpsConnector<HttpConnector>>) -> Result<Response<Body>, DataError> {
async fn get_response(
self,
client: &Client<HttpsConnector<HttpConnector>>,
) -> Result<Response<Body>, DataError> {
let mut source = self;
loop
{
loop {
match source.request_constructor(&client.clone()).await {
Ok(response) => return Ok(response),
Err(err) => match err {
@ -267,7 +268,7 @@ impl Source {
e => return Err(e),
},
}
};
}
}
async fn request_constructor(
@ -305,14 +306,12 @@ impl Source {
}
let res = client.request(req).await?;
//.map_err(From::from)
//.map_err(From::from)
self.match_status(res)
}
}
async fn response_to_channel(
res: Response<Body>,
) -> Result<Channel, DataError> {
async fn response_to_channel(res: Response<Body>) -> Result<Channel, DataError> {
let chunk = hyper::body::to_bytes(res.into_body()).await?;
let buf = String::from_utf8_lossy(&chunk).into_owned();
Channel::from_str(&buf).map_err(From::from)

View File

@ -20,7 +20,7 @@
// FIXME:
//! Docs.
use futures::{future::ok, future::lazy, prelude::*, stream::FuturesUnordered};
use futures::{future::ok, prelude::*, stream::FuturesUnordered};
use tokio;
use hyper::client::HttpConnector;
@ -42,18 +42,18 @@ type HttpsClient = Client<HttpsConnector<HttpConnector>>;
/// Convert `rss::Channel` into `Feed` -> Index Podcast -> Index Episodes.
pub async fn pipeline<'a, S>(mut sources: S, client: HttpsClient)
where
S: Stream<Item = Result<Source, DataError>> + Send + 'a + std::marker::Unpin
S: Stream<Item = Result<Source, DataError>> + Send + 'a + std::marker::Unpin,
{
while let Some(source_result) = sources.next().await {
if let Ok(source) = source_result {
match source.into_feed(client.clone()).await {
Ok(feed) => {
let fut = lazy(|_| feed.index().map_err(|err| error!("Error: {}", err)));
let fut = feed.index().map_err(|err| error!("Error: {}", err));
tokio::spawn(fut);
},
}
// Avoid spamming the stderr when it's not an actual error
Err(DataError::FeedNotModified(_)) => (),
Err(err) => error!("Error: {}", err),
Err(err) => error!("Error: {}", err),
};
}
}