Use impl Trait syntax instead of Trait Objects.

Rust v1.26 introduced impl Trait which avoid the heap allocation with Box
and makes the code a bit more ergonomic.

For more see https://blog.rust-lang.org/2018/05/10/Rust-1.26.html
This commit is contained in:
Jordan Petridis 2018-05-11 12:06:31 +03:00
parent 777a2102f8
commit 041684b13a
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
3 changed files with 25 additions and 38 deletions

View File

@ -25,23 +25,21 @@ pub struct Feed {
impl Feed { impl Feed {
/// Index the contents of the RSS `Feed` into the database. /// Index the contents of the RSS `Feed` into the database.
pub fn index(self) -> Box<Future<Item = (), Error = DataError> + Send> { pub fn index(self) -> impl Future<Item = (), Error = DataError> + Send {
let fut = self.parse_podcast_async() self.parse_podcast_async()
.and_then(|pd| pd.to_podcast()) .and_then(|pd| pd.to_podcast())
.and_then(move |pd| self.index_channel_items(pd)); .and_then(move |pd| self.index_channel_items(pd))
Box::new(fut)
} }
fn parse_podcast(&self) -> NewPodcast { fn parse_podcast(&self) -> NewPodcast {
NewPodcast::new(&self.channel, self.source_id) NewPodcast::new(&self.channel, self.source_id)
} }
fn parse_podcast_async(&self) -> Box<Future<Item = NewPodcast, Error = DataError> + Send> { fn parse_podcast_async(&self) -> impl Future<Item = NewPodcast, Error = DataError> + Send {
Box::new(ok(self.parse_podcast())) ok(self.parse_podcast())
} }
fn index_channel_items(self, pd: Podcast) -> Box<Future<Item = (), Error = DataError> + Send> { fn index_channel_items(self, pd: Podcast) -> impl Future<Item = (), Error = DataError> + Send {
let stream = stream::iter_ok::<_, DataError>(self.channel.into_items()); let stream = stream::iter_ok::<_, DataError>(self.channel.into_items());
// Parse the episodes // Parse the episodes
@ -52,11 +50,9 @@ impl Feed {
}); });
// Filter errors, Index updatable episodes, return insertables. // Filter errors, Index updatable episodes, return insertables.
let insertables = filter_episodes(episodes); filter_episodes(episodes)
// Batch index insertable episodes. // Batch index insertable episodes.
let idx = insertables.and_then(|eps| ok(batch_insert_episodes(&eps))); .and_then(|eps| ok(batch_insert_episodes(&eps)))
Box::new(idx)
} }
} }
@ -87,11 +83,11 @@ fn determine_ep_state(
fn filter_episodes<'a, S>( fn filter_episodes<'a, S>(
stream: S, stream: S,
) -> Box<Future<Item = Vec<NewEpisode>, Error = DataError> + Send + 'a> ) -> impl Future<Item = Vec<NewEpisode>, Error = DataError> + Send + 'a
where where
S: Stream<Item = IndexState<NewEpisode>, Error = DataError> + Send + 'a, S: Stream<Item = IndexState<NewEpisode>, Error = DataError> + Send + 'a,
{ {
let list = stream.filter_map(|state| match state { stream.filter_map(|state| match state {
IndexState::NotChanged => None, IndexState::NotChanged => None,
// Update individual rows, and filter them // Update individual rows, and filter them
IndexState::Update((ref ep, rowid)) => { IndexState::Update((ref ep, rowid)) => {
@ -105,9 +101,7 @@ where
IndexState::Index(s) => Some(s), IndexState::Index(s) => Some(s),
}) })
// only Index is left, collect them for batch index // only Index is left, collect them for batch index
.collect(); .collect()
Box::new(list)
} }
fn batch_insert_episodes(episodes: &[NewEpisode]) { fn batch_insert_episodes(episodes: &[NewEpisode]) {

View File

@ -177,7 +177,7 @@ impl Source {
self, self,
client: Client<HttpsConnector<HttpConnector>>, client: Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool, ignore_etags: bool,
) -> Box<Future<Item = Feed, Error = DataError>> { ) -> impl Future<Item = Feed, Error = DataError> {
let id = self.id(); let id = self.id();
let response = loop_fn(self, move |source| { let response = loop_fn(self, move |source| {
source source
@ -194,7 +194,7 @@ impl Source {
}) })
}); });
let feed = response response
.and_then(response_to_channel) .and_then(response_to_channel)
.and_then(move |chan| { .and_then(move |chan| {
FeedBuilder::default() FeedBuilder::default()
@ -202,9 +202,7 @@ impl Source {
.source_id(id) .source_id(id)
.build() .build()
.map_err(From::from) .map_err(From::from)
}); })
Box::new(feed)
} }
// TODO: make ignore_etags an Enum for better ergonomics. // TODO: make ignore_etags an Enum for better ergonomics.
@ -213,7 +211,7 @@ impl Source {
self, self,
client: &Client<HttpsConnector<HttpConnector>>, client: &Client<HttpsConnector<HttpConnector>>,
ignore_etags: bool, ignore_etags: bool,
) -> Box<Future<Item = Response, Error = DataError>> { ) -> impl Future<Item = Response, Error = DataError> {
// FIXME: remove unwrap somehow // FIXME: remove unwrap somehow
let uri = Uri::from_str(self.uri()).unwrap(); let uri = Uri::from_str(self.uri()).unwrap();
let mut req = Request::new(Method::Get, uri); let mut req = Request::new(Method::Get, uri);
@ -234,25 +232,22 @@ impl Source {
} }
} }
let work = client client
.request(req) .request(req)
.map_err(From::from) .map_err(From::from)
.and_then(move |res| self.match_status(res)); .and_then(move |res| self.match_status(res))
Box::new(work)
} }
} }
#[allow(needless_pass_by_value)] #[allow(needless_pass_by_value)]
fn response_to_channel(res: Response) -> Box<Future<Item = Channel, Error = DataError> + Send> { fn response_to_channel(res: Response) -> impl Future<Item = Channel, Error = DataError> + Send {
let chan = res.body() res.body()
.concat2() .concat2()
.map(|x| x.into_iter()) .map(|x| x.into_iter())
.map_err(From::from) .map_err(From::from)
.map(|iter| iter.collect::<Vec<u8>>()) .map(|iter| iter.collect::<Vec<u8>>())
.map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned())
.and_then(|buf| Channel::from_str(&buf).map_err(From::from)); .and_then(|buf| Channel::from_str(&buf).map_err(From::from))
Box::new(chan)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -50,24 +50,22 @@ pub fn pipeline<'a, S>(
sources: S, sources: S,
ignore_etags: bool, ignore_etags: bool,
client: &HttpsClient, client: &HttpsClient,
) -> Box<Future<Item = Vec<()>, Error = DataError> + 'a> ) -> impl Future<Item = Vec<()>, Error = DataError> + 'a
where where
S: Stream<Item = Source, Error = DataError> + 'a, S: Stream<Item = Source, Error = DataError> + 'a,
{ {
let pipeline = sources sources
.and_then(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) .and_then(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags)))
.and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index()))) .and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index())))
// the stream will stop at the first error so // the stream will stop at the first error so
// we ensure that everything will succeded regardless. // we ensure that everything will succeded regardless.
.map_err(|err| error!("Error: {}", err)) .map_err(|err| error!("Error: {}", err))
.then(|_| ok::<(), DataError>(())) .then(|_| ok::<(), DataError>(()))
.collect(); .collect()
Box::new(pipeline)
} }
/// Creates a tokio `reactor::Core`, and a `hyper::Client` and /// Creates a tokio `reactor::Core`, and a `hyper::Client` and
/// runs the pipeline. /// runs the pipeline to completion. The `reactor::Core` is dropped afterwards.
pub fn run<S>(sources: S, ignore_etags: bool) -> Result<(), DataError> pub fn run<S>(sources: S, ignore_etags: bool) -> Result<(), DataError>
where where
S: IntoIterator<Item = Source>, S: IntoIterator<Item = Source>,