Initial switch to using r2d2.

This commit is contained in:
Jordan Petridis 2017-11-19 22:06:10 +02:00
parent 052988a2b1
commit 55442529a8
No known key found for this signature in database
GPG Key ID: CEABAD9F5683B9A6
9 changed files with 175 additions and 149 deletions

38
Cargo.lock generated
View File

@ -25,6 +25,11 @@ name = "ansi_term"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "antidote"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "atk-sys"
version = "0.4.0"
@ -544,6 +549,8 @@ dependencies = [
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2-diesel 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1009,6 +1016,25 @@ name = "quote"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "r2d2"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"scheduled-thread-pool 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "r2d2-diesel"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"diesel 0.16.0 (git+https://github.com/diesel-rs/diesel.git)",
"r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.3.18"
@ -1148,6 +1174,14 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "scoped-tls"
version = "0.1.0"
@ -1521,6 +1555,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum advapi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e06588080cb19d0acb6739808aafa5f26bfb2ca015b2b6370028b44cf7cb8a9a"
"checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699"
"checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455"
"checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5"
"checksum atk-sys 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c69658a4e18d5c9575f716e24559645d08a4044d6946c30c2e0025952c84d842"
"checksum atty 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21e50800ec991574876040fff8ee46b136a53e985286fbe6a3bdfe6421b78860"
"checksum backtrace 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8709cc7ec06f6f0ae6c2c7e12f6ed41540781f72b488d83734978295ceae182e"
@ -1622,6 +1657,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum precomputed-hash 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
"checksum quick-xml 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)" = "19a3a610544419c527d5f51ae1a6ae3db533e25c117d3eed8fce6434f70c5e95"
"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
"checksum r2d2 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "2c8284508b38df440f8f3527395e23c4780b22f74226b270daf58fee38e4bcce"
"checksum r2d2-diesel 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f6b921696a6c45991296d21b52ed973b9fb56f6c47524fda1f99458c2d6c0478"
"checksum rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6475140dfd8655aeb72e1fd4b7a1cc1c202be65d71669476e392fe62532b9edd"
"checksum rayon 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ed02d09394c94ffbdfdc755ad62a132e94c3224a8354e78a1200ced34df12edf"
"checksum rayon-core 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e64b609139d83da75902f88fd6c01820046840a18471e4dfcd5ac7c0f46bea53"
@ -1637,6 +1674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda"
"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f"
"checksum schannel 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7554288337c1110e34d7a2433518d889374c1de1a45f856b7bcddb03702131fc"
"checksum scheduled-thread-pool 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d9fbe48ead32343b76f544c85953bf260ed39219a8bbbb62cd85f6a00f9644f"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27"
"checksum secur32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f412dfa83308d893101dd59c10d6fda8283465976c28c287c5c855bf8d216bc"

View File

@ -10,6 +10,8 @@ dotenv = "0.10.1"
error-chain = "0.11.0"
lazy_static = "0.2.10"
log = "0.3.8"
r2d2 = "0.7.4"
r2d2-diesel = "0.16.0"
rayon = "0.9.0"
reqwest = "0.8.1"
rfc822_sanitizer = "0.3.3"
@ -23,7 +25,6 @@ git = "https://github.com/diesel-rs/diesel.git"
[dependencies.diesel_codegen]
features = ["sqlite"]
# version = "0.16.0"
git = "https://github.com/diesel-rs/diesel.git"
[dev-dependencies]

View File

@ -8,22 +8,14 @@ extern crate rss;
extern crate tempdir;
extern crate test;
use diesel::prelude::*;
use rayon::prelude::*;
use rand::Rng;
use test::Bencher;
use hammond_data::utils::run_migration_on;
use hammond_data::models::NewSource;
use hammond_data::feed::{index, Feed};
use hammond_data::Database;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
struct TempDB(tempdir::TempDir, PathBuf, SqliteConnection);
// Big rss feed
const PCPER: &[u8] = include_bytes!("feeds/pcpermp3.xml");
@ -40,57 +32,35 @@ static URLS: &[(&[u8], &str)] = &[
(LAS, "https://feeds2.feedburner.com/TheLinuxActionShow"),
];
/// Create and return a Temporary DB.
/// Will be destroed once the returned variable(s) is dropped.
fn get_temp_db() -> TempDB {
let mut rng = rand::thread_rng();
let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap();
let db_path = tmp_dir
.path()
.join(format!("hammonddb_{}.db", rng.gen::<usize>()));
let db = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap();
run_migration_on(&db).unwrap();
TempDB(tmp_dir, db_path, db)
}
fn index_urls(m: &Database) {
fn index_urls() {
URLS.par_iter()
.map(|&(buff, url)| {
// Create and insert a Source into db
let s = NewSource::new_with_uri(url).into_source(m).unwrap();
let s = NewSource::new_with_uri(url).into_source().unwrap();
// parse it into a channel
let chan = rss::Channel::read_from(BufReader::new(buff)).unwrap();
Feed::new_from_channel_source(chan, s)
Feed::from_channel_source(chan, s)
})
.for_each(|feed| {
index(m, &mut [feed]);
index(&mut [feed]);
});
}
#[bench]
fn bench_index_feeds(b: &mut Bencher) {
let TempDB(_tmp_dir, _db_path, db) = get_temp_db();
let m = Arc::new(Mutex::new(db));
b.iter(|| {
index_urls(&Arc::clone(&m));
index_urls();
});
}
#[bench]
fn bench_index_unchanged_feeds(b: &mut Bencher) {
let TempDB(_tmp_dir, _db_path, db) = get_temp_db();
let m = Arc::new(Mutex::new(db));
// Index first so it will only bench the comparison test case.
index_urls(&Arc::clone(&m));
index_urls();
b.iter(|| {
for _ in 0..10 {
index_urls(&Arc::clone(&m));
index_urls();
}
});
}

View File

@ -8,22 +8,29 @@ use chrono::prelude::*;
/// Random db querries helper functions.
/// Probably needs cleanup.
pub fn get_sources(con: &SqliteConnection) -> QueryResult<Vec<Source>> {
use POOL;
pub fn get_sources() -> QueryResult<Vec<Source>> {
use schema::source::dsl::*;
source.load::<Source>(con)
let con = POOL.get().unwrap();
let s = source.load::<Source>(&*con);
// s.iter().for_each(|x| println!("{:#?}", x));
s
}
pub fn get_podcasts(con: &SqliteConnection) -> QueryResult<Vec<Podcast>> {
pub fn get_podcasts() -> QueryResult<Vec<Podcast>> {
use schema::podcast::dsl::*;
podcast.load::<Podcast>(con)
let con = POOL.get().unwrap();
podcast.load::<Podcast>(&*con)
}
pub fn get_episodes(con: &SqliteConnection) -> QueryResult<Vec<Episode>> {
pub fn get_episodes() -> QueryResult<Vec<Episode>> {
use schema::episode::dsl::*;
episode.order(epoch.desc()).load::<Episode>(con)
let con = POOL.get().unwrap();
episode.order(epoch.desc()).load::<Episode>(&*con)
}
pub fn get_downloaded_episodes(con: &SqliteConnection) -> QueryResult<Vec<Episode>> {
@ -104,10 +111,11 @@ pub fn get_pd_episodes_limit(
.load::<Episode>(con)
}
pub fn get_source_from_uri(con: &SqliteConnection, uri_: &str) -> QueryResult<Source> {
pub fn get_source_from_uri(uri_: &str) -> QueryResult<Source> {
use schema::source::dsl::*;
source.filter(uri.eq(uri_)).get_result::<Source>(con)
let con = POOL.get().unwrap();
source.filter(uri.eq(uri_)).get_result::<Source>(&*con)
}
pub fn get_podcast_from_title(con: &SqliteConnection, title_: &str) -> QueryResult<Podcast> {

View File

@ -6,13 +6,11 @@ use rss;
use dbqueries;
use parser;
use Database;
use POOL;
use models::{Podcast, Source};
use errors::*;
use std::sync::Arc;
#[derive(Debug)]
pub struct Feed {
@ -21,42 +19,42 @@ pub struct Feed {
}
impl Feed {
pub fn new_from_source(db: &Database, s: Source) -> Result<Feed> {
s.refresh(db)
pub fn from_source(s: Source) -> Result<Feed> {
s.refresh()
}
pub fn new_from_channel_source(chan: rss::Channel, s: Source) -> Feed {
pub fn from_channel_source(chan: rss::Channel, s: Source) -> Feed {
Feed {
channel: chan,
source: s,
}
}
fn index(&self, db: &Database) -> Result<()> {
let pd = self.index_channel(db)?;
fn index(&self) -> Result<()> {
let pd = self.index_channel()?;
self.index_channel_items(db, &pd)?;
self.index_channel_items(&pd)?;
Ok(())
}
fn index_channel(&self, db: &Database) -> Result<Podcast> {
fn index_channel(&self) -> Result<Podcast> {
let pd = parser::new_podcast(&self.channel, *self.source.id());
// Convert NewPodcast to Podcast
pd.into_podcast(db)
pd.into_podcast()
}
// TODO: Refactor transcactions and find a way to do it in parallel.
fn index_channel_items(&self, db: &Database, pd: &Podcast) -> Result<()> {
fn index_channel_items(&self, pd: &Podcast) -> Result<()> {
let items = self.channel.items();
let episodes: Vec<_> = items
.into_par_iter()
.map(|item| parser::new_episode(item, *pd.id()))
.collect();
let tempdb = db.lock().unwrap();
let tempdb = POOL.clone().get().unwrap();
let _ = tempdb.transaction::<(), Error, _>(|| {
episodes.into_iter().for_each(|x| {
let e = x.index(&tempdb);
let e = x.index(&*tempdb);
if let Err(err) = e {
error!("Failed to index episode: {:?}.", x);
error!("Error msg: {}", err);
@ -68,17 +66,17 @@ impl Feed {
}
}
pub fn index_all(db: &Database) -> Result<()> {
let mut f = fetch_all(db)?;
pub fn index_all() -> Result<()> {
let mut f = fetch_all()?;
index(db, &mut f);
index(&mut f);
info!("Indexing done.");
Ok(())
}
pub fn index(db: &Database, feeds: &mut [Feed]) {
pub fn index(feeds: &mut [Feed]) {
feeds.into_par_iter().for_each(|f| {
let e = f.index(&Arc::clone(db));
let e = f.index();
if e.is_err() {
error!("Error While trying to update the database.");
error!("Error msg: {}", e.unwrap_err());
@ -86,22 +84,19 @@ pub fn index(db: &Database, feeds: &mut [Feed]) {
});
}
pub fn fetch_all(db: &Database) -> Result<Vec<Feed>> {
let feeds = {
let conn = db.lock().unwrap();
dbqueries::get_sources(&conn)?
};
pub fn fetch_all() -> Result<Vec<Feed>> {
let feeds = dbqueries::get_sources()?;
let results = fetch(db, feeds);
let results = fetch(feeds);
Ok(results)
}
pub fn fetch(db: &Database, feeds: Vec<Source>) -> Vec<Feed> {
pub fn fetch(feeds: Vec<Source>) -> Vec<Feed> {
let results: Vec<_> = feeds
.into_par_iter()
.filter_map(|x| {
let uri = x.uri().to_owned();
let l = Feed::new_from_source(&Arc::clone(db), x);
let l = Feed::from_source(x);
if l.is_ok() {
l.ok()
} else {
@ -118,46 +113,17 @@ pub fn fetch(db: &Database, feeds: Vec<Source>) -> Vec<Feed> {
#[cfg(test)]
mod tests {
extern crate rand;
extern crate tempdir;
use diesel::prelude::*;
use rss;
use self::rand::Rng;
use models::NewSource;
use utils::run_migration_on;
use std::io::BufReader;
use std::path::PathBuf;
use std::fs;
use std::sync::Mutex;
use std::io::BufReader;
use super::*;
struct TempDB(tempdir::TempDir, PathBuf, SqliteConnection);
/// Create and return a Temporary DB.
/// Will be destroed once the returned variable(s) is dropped.
fn get_temp_db() -> TempDB {
let mut rng = rand::thread_rng();
let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap();
let db_path = tmp_dir
.path()
.join(format!("hammonddb_{}.db", rng.gen::<usize>()));
let db = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap();
run_migration_on(&db).unwrap();
TempDB(tmp_dir, db_path, db)
}
#[test]
/// Insert feeds and update/index them.
fn test_index_loop() {
let TempDB(_tmp_dir, _db_path, db) = get_temp_db();
let db = Arc::new(Mutex::new(db));
let inpt = vec![
"https://request-for-explanation.github.io/podcast/rss.xml",
"https://feeds.feedburner.com/InterceptedWithJeremyScahill",
@ -166,23 +132,17 @@ mod tests {
];
inpt.iter().for_each(|feed| {
NewSource::new_with_uri(feed)
.into_source(&db.clone())
.unwrap();
NewSource::new_with_uri(feed).into_source().unwrap();
});
index_all(&db).unwrap();
index_all().unwrap();
// Run again to cover Unique constrains erros.
index_all(&db).unwrap();
index_all().unwrap();
}
#[test]
fn test_complete_index() {
let TempDB(_tmp_dir, _db_path, db) = get_temp_db();
// complete_index runs in parallel so it requires a mutex as argument.
let m = Arc::new(Mutex::new(db));
// vec of (path, url) tuples.
let urls = vec![
(
@ -195,7 +155,7 @@ mod tests {
),
(
"tests/feeds/TheBreakthrough.xml",
"http://feeds.feedburner.com/propublica/podcast",
"http://feeds.propublica.org/propublica/podcast",
),
(
"tests/feeds/R4Explanation.xml",
@ -206,25 +166,22 @@ mod tests {
let mut feeds: Vec<_> = urls.iter()
.map(|&(path, url)| {
// Create and insert a Source into db
let s = NewSource::new_with_uri(url)
.into_source(&m.clone())
.unwrap();
let s = NewSource::new_with_uri(url).into_source().unwrap();
// open the xml file
let feed = fs::File::open(path).unwrap();
// parse it into a channel
let chan = rss::Channel::read_from(BufReader::new(feed)).unwrap();
Feed::new_from_channel_source(chan, s)
Feed::from_channel_source(chan, s)
})
.collect();
// Index the channels
index(&m, &mut feeds);
index(&mut feeds);
// Assert the index rows equal the controlled results
let tempdb = m.lock().unwrap();
assert_eq!(dbqueries::get_sources(&tempdb).unwrap().len(), 4);
assert_eq!(dbqueries::get_podcasts(&tempdb).unwrap().len(), 4);
assert_eq!(dbqueries::get_episodes(&tempdb).unwrap().len(), 274);
assert_eq!(dbqueries::get_sources().unwrap().len(), 4);
assert_eq!(dbqueries::get_podcasts().unwrap().len(), 4);
assert_eq!(dbqueries::get_episodes().unwrap().len(), 274);
}
}

View File

@ -16,6 +16,8 @@ extern crate diesel;
extern crate diesel_codegen;
extern crate chrono;
extern crate r2d2;
extern crate r2d2_diesel;
extern crate rayon;
extern crate reqwest;
extern crate rfc822_sanitizer;
@ -55,11 +57,49 @@ lazy_static!{
HAMMOND_XDG.create_cache_directory(HAMMOND_XDG.get_cache_home()).unwrap()
};
static ref DB_PATH: PathBuf = {
HAMMOND_XDG.place_data_file("hammond.db").unwrap()
};
pub static ref DL_DIR: PathBuf = {
HAMMOND_XDG.create_data_directory("Downloads").unwrap()
};
pub static ref DB_PATH: PathBuf = HAMMOND_XDG.place_data_file("hammond.db").unwrap();
}
#[cfg(not(test))]
lazy_static! {
pub static ref POOL: utils::Pool = utils::init_pool(DB_PATH.to_str().unwrap());
}
#[cfg(test)]
lazy_static! {
static ref TEMPDB: TempDB = get_temp_db();
pub static ref POOL: &'static utils::Pool = &TEMPDB.2;
}
#[cfg(test)]
struct TempDB(tempdir::TempDir, PathBuf, utils::Pool);
#[cfg(test)]
extern crate rand;
#[cfg(test)]
extern crate tempdir;
#[cfg(test)]
use rand::Rng;
#[cfg(test)]
/// Create and return a Temporary DB.
/// Will be destroed once the returned variable(s) is dropped.
fn get_temp_db() -> TempDB {
let mut rng = rand::thread_rng();
let tmp_dir = tempdir::TempDir::new("hammond_unit_test").unwrap();
let db_path = tmp_dir
.path()
.join(format!("hammonddb_{}.db", rng.gen::<usize>()));
let pool = utils::init_pool(db_path.to_str().unwrap());
let db = pool.get().unwrap();
utils::run_migration_on(&db).unwrap();
TempDB(tmp_dir, db_path, pool)
}

View File

@ -3,7 +3,7 @@ use diesel;
use schema::{episode, podcast, source};
use models::{Podcast, Source};
use Database;
use POOL;
use errors::*;
use dbqueries;
@ -26,21 +26,20 @@ impl<'a> NewSource<'a> {
}
}
fn index(&self, db: &Database) {
fn index(&self) {
use schema::source::dsl::*;
let tempdb = db.lock().unwrap();
let tempdb = POOL.clone().get().unwrap();
// Throw away the result like `insert or ignore`
// Diesel deos not support `insert or ignore` yet.
let _ = diesel::insert_into(source).values(self).execute(&*tempdb);
}
// Look out for when tryinto lands into stable.
pub fn into_source(self, db: &Database) -> QueryResult<Source> {
self.index(db);
pub fn into_source(self) -> QueryResult<Source> {
self.index();
let tempdb = db.lock().unwrap();
dbqueries::get_source_from_uri(&tempdb, self.uri)
dbqueries::get_source_from_uri(self.uri)
}
}
@ -104,28 +103,28 @@ pub struct NewPodcast {
impl NewPodcast {
// Look out for when tryinto lands into stable.
pub fn into_podcast(self, db: &Database) -> Result<Podcast> {
self.index(db)?;
let tempdb = db.lock().unwrap();
Ok(dbqueries::get_podcast_from_title(&tempdb, &self.title)?)
pub fn into_podcast(self) -> Result<Podcast> {
self.index()?;
let tempdb = POOL.clone().get().unwrap();
Ok(dbqueries::get_podcast_from_title(&*tempdb, &self.title)?)
}
fn index(&self, db: &Database) -> QueryResult<()> {
fn index(&self) -> QueryResult<()> {
use schema::podcast::dsl::*;
let pd = {
let tempdb = db.lock().unwrap();
dbqueries::get_podcast_from_title(&tempdb, &self.title)
let tempdb = POOL.clone().get().unwrap();
dbqueries::get_podcast_from_title(&*tempdb, &self.title)
};
match pd {
Ok(foo) => if foo.link() != self.link {
let tempdb = db.lock().unwrap();
let tempdb = POOL.clone().get().unwrap();
diesel::replace_into(podcast)
.values(self)
.execute(&*tempdb)?;
},
Err(_) => {
let tempdb = db.lock().unwrap();
let tempdb = POOL.clone().get().unwrap();
diesel::insert_into(podcast).values(self).execute(&*tempdb)?;
}
}

View File

@ -11,6 +11,7 @@ use errors::*;
use models::insertables::NewPodcast;
use Database;
use POOL;
use std::io::Read;
use std::str::FromStr;
@ -265,7 +266,7 @@ impl<'a> Source {
/// Extract Etag and LastModifier from req, and update self and the
/// corresponding db row.
fn update_etag(&mut self, db: &Database, req: &reqwest::Response) -> Result<()> {
fn update_etag(&mut self, req: &reqwest::Response) -> Result<()> {
let headers = req.headers();
// let etag = headers.get_raw("ETag").unwrap();
@ -278,18 +279,18 @@ impl<'a> Source {
{
self.http_etag = etag.map(|x| x.tag().to_string().to_owned());
self.last_modified = lmod.map(|x| format!("{}", x));
self.save(db)?;
self.save()?;
}
Ok(())
}
pub fn save(&self, db: &Database) -> QueryResult<Source> {
let tempdb = db.lock().unwrap();
pub fn save(&self) -> QueryResult<Source> {
let tempdb = POOL.clone().get().unwrap();
self.save_changes::<Source>(&*tempdb)
}
pub fn refresh(mut self, db: &Database) -> Result<Feed> {
pub fn refresh(mut self) -> Result<Feed> {
use reqwest::header::{ETag, EntityTag, Headers, HttpDate, LastModified};
let mut headers = Headers::new();
@ -322,12 +323,12 @@ impl<'a> Source {
// _ => (),
// };
self.update_etag(db, &req)?;
self.update_etag(&req)?;
let mut buf = String::new();
req.read_to_string(&mut buf)?;
let chan = Channel::from_str(&buf)?;
Ok(Feed::new_from_channel_source(chan, self))
Ok(Feed::from_channel_source(chan, self))
}
}

View File

@ -2,6 +2,10 @@ use rayon::prelude::*;
use diesel::prelude::*;
use chrono::prelude::*;
use r2d2;
use diesel::sqlite::SqliteConnection;
use r2d2_diesel::ConnectionManager;
use errors::*;
use dbqueries;
use Database;
@ -15,11 +19,19 @@ use DB_PATH;
embed_migrations!("migrations/");
pub type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub fn init() -> Result<()> {
let conn = establish_connection();
run_migration_on(&conn)
}
pub fn init_pool(db_path: &str) -> Pool {
let config = r2d2::Config::default();
let manager = ConnectionManager::<SqliteConnection>::new(db_path);
r2d2::Pool::new(config, manager).expect("Failed to create pool.")
}
pub fn run_migration_on(connection: &SqliteConnection) -> Result<()> {
info!("Running DB Migrations...");
embedded_migrations::run(connection)?;