Convert more functions to "async fn"
This commit is contained in:
parent
429356a217
commit
f9d577f596
@ -21,7 +21,6 @@
|
|||||||
#![allow(clippy::unit_arg)]
|
#![allow(clippy::unit_arg)]
|
||||||
//! Index Feeds.
|
//! Index Feeds.
|
||||||
|
|
||||||
use futures::future::*;
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
use rss;
|
use rss;
|
||||||
@ -54,7 +53,7 @@ impl Feed {
|
|||||||
NewShow::new(&self.channel, self.source_id)
|
NewShow::new(&self.channel, self.source_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn index_channel_items(self, pd: Show) -> impl Future<Output = Result<(), DataError>> + Send {
|
async fn index_channel_items(self, pd: Show) -> Result<(), DataError> {
|
||||||
let stream = stream::iter(self.channel.into_items());
|
let stream = stream::iter(self.channel.into_items());
|
||||||
// Parse the episodes
|
// Parse the episodes
|
||||||
let episodes = stream.filter_map(move |item| {
|
let episodes = stream.filter_map(move |item| {
|
||||||
@ -67,9 +66,9 @@ impl Feed {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Filter errors, Index updatable episodes, return insertables.
|
// Filter errors, Index updatable episodes, return insertables.
|
||||||
filter_episodes(episodes)
|
let insertable_episodes = filter_episodes(episodes).await?;
|
||||||
// Batch index insertable episodes.
|
batch_insert_episodes(&insertable_episodes);
|
||||||
.and_then(|eps| ok(batch_insert_episodes(&eps)))
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,32 +93,31 @@ fn determine_ep_state(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filter_episodes<'a, S>(
|
async fn filter_episodes<'a, S>(stream: S) -> Result<Vec<NewEpisode>, DataError>
|
||||||
stream: S,
|
|
||||||
) -> impl Future<Output = Result<Vec<NewEpisode>, DataError>> + Send + 'a
|
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<IndexState<NewEpisode>, DataError>> + Send + 'a,
|
S: Stream<Item = Result<IndexState<NewEpisode>, DataError>>,
|
||||||
{
|
{
|
||||||
stream
|
stream
|
||||||
.try_filter_map(|state| {
|
.try_filter_map(|state| {
|
||||||
async move {
|
async {
|
||||||
match state {
|
let result = match state {
|
||||||
IndexState::NotChanged => Ok(None),
|
IndexState::NotChanged => None,
|
||||||
// Update individual rows, and filter them
|
// Update individual rows, and filter them
|
||||||
IndexState::Update((ref ep, rowid)) => {
|
IndexState::Update((ref ep, rowid)) => {
|
||||||
ep.update(rowid)
|
ep.update(rowid)
|
||||||
.map_err(|err| error!("{}", err))
|
.map_err(|err| error!("{}", err))
|
||||||
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
|
.map_err(|_| error!("Failed to index episode: {:?}.", ep.title()))
|
||||||
.ok();
|
.ok();
|
||||||
|
None
|
||||||
Ok(None)
|
|
||||||
}
|
}
|
||||||
IndexState::Index(s) => Ok(Some(s)),
|
IndexState::Index(s) => Some(s),
|
||||||
}
|
};
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
// only Index is left, collect them for batch index
|
// only Index is left, collect them for batch index
|
||||||
.try_collect()
|
.try_collect()
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn batch_insert_episodes(episodes: &[NewEpisode]) {
|
fn batch_insert_episodes(episodes: &[NewEpisode]) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user