diff --git a/hammond-data/src/feed.rs b/hammond-data/src/feed.rs index c1ddaa2..c2ee2e6 100644 --- a/hammond-data/src/feed.rs +++ b/hammond-data/src/feed.rs @@ -25,23 +25,21 @@ pub struct Feed { impl Feed { /// Index the contents of the RSS `Feed` into the database. - pub fn index(self) -> Box + Send> { - let fut = self.parse_podcast_async() + pub fn index(self) -> impl Future + Send { + self.parse_podcast_async() .and_then(|pd| pd.to_podcast()) - .and_then(move |pd| self.index_channel_items(pd)); - - Box::new(fut) + .and_then(move |pd| self.index_channel_items(pd)) } fn parse_podcast(&self) -> NewPodcast { NewPodcast::new(&self.channel, self.source_id) } - fn parse_podcast_async(&self) -> Box + Send> { - Box::new(ok(self.parse_podcast())) + fn parse_podcast_async(&self) -> impl Future + Send { + ok(self.parse_podcast()) } - fn index_channel_items(self, pd: Podcast) -> Box + Send> { + fn index_channel_items(self, pd: Podcast) -> impl Future + Send { let stream = stream::iter_ok::<_, DataError>(self.channel.into_items()); // Parse the episodes @@ -52,11 +50,9 @@ impl Feed { }); // Filter errors, Index updatable episodes, return insertables. - let insertables = filter_episodes(episodes); - // Batch index insertable episodes. - let idx = insertables.and_then(|eps| ok(batch_insert_episodes(&eps))); - - Box::new(idx) + filter_episodes(episodes) + // Batch index insertable episodes. + .and_then(|eps| ok(batch_insert_episodes(&eps))) } } @@ -87,11 +83,11 @@ fn determine_ep_state( fn filter_episodes<'a, S>( stream: S, -) -> Box, Error = DataError> + Send + 'a> +) -> impl Future, Error = DataError> + Send + 'a where S: Stream, Error = DataError> + Send + 'a, { - let list = stream.filter_map(|state| match state { + stream.filter_map(|state| match state { IndexState::NotChanged => None, // Update individual rows, and filter them IndexState::Update((ref ep, rowid)) => { @@ -105,9 +101,7 @@ where IndexState::Index(s) => Some(s), }) // only Index is left, collect them for batch index - .collect(); - - Box::new(list) + .collect() } fn batch_insert_episodes(episodes: &[NewEpisode]) { diff --git a/hammond-data/src/models/source.rs b/hammond-data/src/models/source.rs index d3ba80c..9035db4 100644 --- a/hammond-data/src/models/source.rs +++ b/hammond-data/src/models/source.rs @@ -177,7 +177,7 @@ impl Source { self, client: Client>, ignore_etags: bool, - ) -> Box> { + ) -> impl Future { let id = self.id(); let response = loop_fn(self, move |source| { source @@ -194,7 +194,7 @@ impl Source { }) }); - let feed = response + response .and_then(response_to_channel) .and_then(move |chan| { FeedBuilder::default() @@ -202,9 +202,7 @@ impl Source { .source_id(id) .build() .map_err(From::from) - }); - - Box::new(feed) + }) } // TODO: make ignore_etags an Enum for better ergonomics. @@ -213,7 +211,7 @@ impl Source { self, client: &Client>, ignore_etags: bool, - ) -> Box> { + ) -> impl Future { // FIXME: remove unwrap somehow let uri = Uri::from_str(self.uri()).unwrap(); let mut req = Request::new(Method::Get, uri); @@ -234,25 +232,22 @@ impl Source { } } - let work = client + client .request(req) .map_err(From::from) - .and_then(move |res| self.match_status(res)); - Box::new(work) + .and_then(move |res| self.match_status(res)) } } #[allow(needless_pass_by_value)] -fn response_to_channel(res: Response) -> Box + Send> { - let chan = res.body() +fn response_to_channel(res: Response) -> impl Future + Send { + res.body() .concat2() .map(|x| x.into_iter()) .map_err(From::from) .map(|iter| iter.collect::>()) .map(|utf_8_bytes| String::from_utf8_lossy(&utf_8_bytes).into_owned()) - .and_then(|buf| Channel::from_str(&buf).map_err(From::from)); - - Box::new(chan) + .and_then(|buf| Channel::from_str(&buf).map_err(From::from)) } #[cfg(test)] diff --git a/hammond-data/src/pipeline.rs b/hammond-data/src/pipeline.rs index c1bea16..b31ac88 100644 --- a/hammond-data/src/pipeline.rs +++ b/hammond-data/src/pipeline.rs @@ -50,24 +50,22 @@ pub fn pipeline<'a, S>( sources: S, ignore_etags: bool, client: &HttpsClient, -) -> Box, Error = DataError> + 'a> +) -> impl Future, Error = DataError> + 'a where S: Stream + 'a, { - let pipeline = sources + sources .and_then(clone!(client => move |s| s.into_feed(client.clone(), ignore_etags))) .and_then(|feed| rayon::scope(|s| s.spawn_future(feed.index()))) // the stream will stop at the first error so // we ensure that everything will succeded regardless. .map_err(|err| error!("Error: {}", err)) .then(|_| ok::<(), DataError>(())) - .collect(); - - Box::new(pipeline) + .collect() } /// Creates a tokio `reactor::Core`, and a `hyper::Client` and -/// runs the pipeline. +/// runs the pipeline to completion. The `reactor::Core` is dropped afterwards. pub fn run(sources: S, ignore_etags: bool) -> Result<(), DataError> where S: IntoIterator,