Moved indexing episode into a NewEpisode method.

There is a performance reggresion caused by the introduction of
Arc<Mutex<Connection>> instead of Connection that should be refactored.

Also removed the db transcaction as it was incomplete.
This commit is contained in:
Jordan Petridis 2017-11-17 20:01:59 +02:00
parent 282a29e7dd
commit e66a337468
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
2 changed files with 41 additions and 34 deletions

View File

@ -1,7 +1,6 @@
use diesel::prelude::*; use diesel::prelude::*;
use rayon::prelude::*; use rayon::prelude::*;
use diesel;
use rss; use rss;
use dbqueries; use dbqueries;
@ -45,48 +44,26 @@ impl Feed {
pd.into_podcast(db) pd.into_podcast(db)
} }
// TODO: Figure out transcactions.
// The synchronous version where there was a db.lock() before the episodes.iter()
// is actually faster.
fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> { fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> {
let it = self.channel.items(); let it = self.channel.items();
let episodes: Vec<_> = it.par_iter() let episodes: Vec<_> = it.par_iter()
.map(|x| feedparser::parse_episode(x, pd.id())) .map(|x| feedparser::parse_episode(x, pd.id()))
.collect(); .collect();
let conn = db.lock().unwrap(); episodes.into_par_iter().for_each(|x| {
let e = conn.transaction::<(), Error, _>(|| { let e = x.index(&Arc::clone(db));
// TODO: if let Err(err) = e {
episodes.iter().for_each(|x| { error!("Failed to index episode: {:?}.", x);
let e = index_episode(&conn, x); error!("Error msg: {}", err);
if let Err(err) = e { };
error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err);
};
});
Ok(())
}); });
drop(conn); Ok(())
e
} }
} }
// TODO: Currently using diesel from master git.
// Watch out for v0.99.0 beta and change the toml.
fn index_episode(con: &SqliteConnection, ep: &NewEpisode) -> QueryResult<()> {
use schema::episode::dsl::*;
match dbqueries::get_episode_from_uri(con, ep.uri.unwrap()) {
Ok(foo) => if foo.title() != ep.title
|| foo.published_date() != ep.published_date.as_ref().map(|x| x.as_str())
{
diesel::replace_into(episode).values(ep).execute(con)?;
},
Err(_) => {
diesel::insert_into(episode).values(ep).execute(con)?;
}
}
Ok(())
}
pub fn full_index_loop(db: &Database) -> Result<()> { pub fn full_index_loop(db: &Database) -> Result<()> {
let mut f = fetch_all_feeds(db)?; let mut f = fetch_all_feeds(db)?;

View File

@ -59,6 +59,36 @@ pub struct NewEpisode<'a> {
pub podcast_id: i32, pub podcast_id: i32,
} }
impl<'a> NewEpisode<'a> {
// TODO: Currently using diesel from master git.
// Watch out for v0.99.0 beta and change the toml.
// TODO: Refactor into batch indexes instead.
pub fn index(&self, db: &Database) -> QueryResult<()> {
use schema::episode::dsl::*;
let ep = {
let tempdb = db.lock().unwrap();
dbqueries::get_episode_from_uri(&tempdb, self.uri.unwrap())
};
match ep {
Ok(foo) => if foo.title() != self.title
|| foo.published_date() != self.published_date.as_ref().map(|x| x.as_str())
{
let tempdb = db.lock().unwrap();
diesel::replace_into(episode)
.values(self)
.execute(&*tempdb)?;
},
Err(_) => {
let tempdb = db.lock().unwrap();
diesel::insert_into(episode).values(self).execute(&*tempdb)?;
}
}
Ok(())
}
}
#[derive(Insertable)] #[derive(Insertable)]
#[table_name = "podcast"] #[table_name = "podcast"]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -78,7 +108,7 @@ impl NewPodcast {
Ok(dbqueries::get_podcast_from_title(&tempdb, &self.title)?) Ok(dbqueries::get_podcast_from_title(&tempdb, &self.title)?)
} }
fn index(&self, db: &Database) -> Result<()> { fn index(&self, db: &Database) -> QueryResult<()> {
use schema::podcast::dsl::*; use schema::podcast::dsl::*;
let pd = { let pd = {
let tempdb = db.lock().unwrap(); let tempdb = db.lock().unwrap();