Reverted index_channel items to the synchronous version.
Using a transaction yields ~1/3 better performance as of now.
This commit is contained in:
parent
5c84b77434
commit
563f249a48
@ -1,5 +1,6 @@
|
||||
use rayon::prelude::*;
|
||||
use diesel::Identifiable;
|
||||
use diesel::prelude::*;
|
||||
|
||||
use rss;
|
||||
|
||||
@ -44,9 +45,7 @@ impl Feed {
|
||||
pd.into_podcast(db)
|
||||
}
|
||||
|
||||
// TODO: Figure out transcactions.
|
||||
// The synchronous version where there was a db.lock() before the episodes.iter()
|
||||
// is actually faster.
|
||||
// TODO: Refactor transcactions and find a way to do it in parallel.
|
||||
fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> {
|
||||
let items = self.channel.items();
|
||||
let episodes: Vec<_> = items
|
||||
@ -54,12 +53,16 @@ impl Feed {
|
||||
.map(|item| parser::new_episode(item, *pd.id()))
|
||||
.collect();
|
||||
|
||||
episodes.into_par_iter().for_each(|ep| {
|
||||
let e = ep.index(&Arc::clone(db));
|
||||
if let Err(err) = e {
|
||||
error!("Failed to index episode: {:?}.", ep);
|
||||
error!("Error msg: {}", err);
|
||||
};
|
||||
let tempdb = db.lock().unwrap();
|
||||
let _ = tempdb.transaction::<(), Error, _>(|| {
|
||||
episodes.into_iter().for_each(|x| {
|
||||
let e = x.index(&tempdb);
|
||||
if let Err(err) = e {
|
||||
error!("Failed to index episode: {:?}.", x);
|
||||
error!("Error msg: {}", err);
|
||||
};
|
||||
});
|
||||
Ok(())
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -62,26 +62,29 @@ impl<'a> NewEpisode<'a> {
|
||||
// TODO: Currently using diesel from master git.
|
||||
// Watch out for v0.99.0 beta and change the toml.
|
||||
// TODO: Refactor into batch indexes instead.
|
||||
pub fn index(&self, db: &Database) -> QueryResult<()> {
|
||||
// TODO: Refactor so all index methods take consistent arguments
|
||||
// like NewEpisode.index wants Sqliteconnection where the other take a Database
|
||||
pub fn index(&self, con: &SqliteConnection) -> QueryResult<()> {
|
||||
use schema::episode::dsl::*;
|
||||
|
||||
let ep = {
|
||||
let tempdb = db.lock().unwrap();
|
||||
dbqueries::get_episode_from_uri(&tempdb, self.uri.unwrap())
|
||||
// let tempdb = db.lock().unwrap();
|
||||
// dbqueries::get_episode_from_uri(&tempdb, self.uri.unwrap())
|
||||
dbqueries::get_episode_from_uri(con, self.uri.unwrap())
|
||||
};
|
||||
|
||||
match ep {
|
||||
Ok(foo) => if foo.title() != self.title
|
||||
|| foo.published_date() != self.published_date.as_ref().map(|x| x.as_str())
|
||||
{
|
||||
let tempdb = db.lock().unwrap();
|
||||
diesel::replace_into(episode)
|
||||
.values(self)
|
||||
.execute(&*tempdb)?;
|
||||
// let tempdb = db.lock().unwrap();
|
||||
diesel::replace_into(episode).values(self).execute(con)?;
|
||||
// .execute(&*tempdb)?;
|
||||
},
|
||||
Err(_) => {
|
||||
let tempdb = db.lock().unwrap();
|
||||
diesel::insert_into(episode).values(self).execute(&*tempdb)?;
|
||||
// let tempdb = db.lock().unwrap();
|
||||
diesel::insert_into(episode).values(self).execute(con)?;
|
||||
// .execute(&*tempdb)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Loading…
Reference in New Issue
Block a user