diff --git a/lib/index/mu-indexer.cc b/lib/index/mu-indexer.cc index a8cbf37d..3b578e63 100644 --- a/lib/index/mu-indexer.cc +++ b/lib/index/mu-indexer.cc @@ -72,11 +72,13 @@ struct Indexer::Private { [this](auto&& path, auto&& statbuf, auto&& info) { return handler(path, statbuf, info); }}, - max_message_size_{store_.metadata().max_message_size} + max_message_size_{store_.metadata().max_message_size}, + batch_size_{store_.metadata().batch_size} { - g_message("created indexer for %s -> %s", + g_message("created indexer for %s -> %s (batch-size: %zu)", store.metadata().root_maildir.c_str(), - store.metadata().database_path.c_str()); + store.metadata().database_path.c_str(), + batch_size_); } ~Private() { stop(); } @@ -104,8 +106,10 @@ struct Indexer::Private { AsyncQueue fq_; - Progress progress_; - IndexState state_; + Progress progress_; + IndexState state_; + const size_t batch_size_; /**< Max number of messages added before + * committing */ std::mutex lock_, wlock_; }; @@ -192,6 +196,11 @@ Indexer::Private::worker() g_debug("started worker"); + /* note that transaction starting/committing is opportunistic, + * and are NOPs if we are already/not in a transaction */ + + store_.begin_transaction(); + while (state_ == IndexState::Scanning || !fq_.empty()) { if (!fq_.pop(item, 250ms)) continue; @@ -200,15 +209,24 @@ Indexer::Private::worker() ++progress_.processed; try { + std::unique_lock lock{lock_}; + store_.add_message(item); ++progress_.updated; + if (progress_.updated % batch_size_ == 0) { + store_.commit_transaction(); + store_.begin_transaction(); + } + } catch (const Mu::Error& er) { g_warning("error adding message @ %s: %s", item.c_str(), er.what()); } maybe_start_worker(); } + + store_.commit_transaction(); } bool @@ -230,9 +248,14 @@ Indexer::Private::cleanup() return state_ == IndexState::Cleaning; }); - g_debug("remove %zu message(s) from store", orphans.size()); - store_.remove_messages(orphans); - progress_.removed += orphans.size(); + // No need for transactions here, remove_messages does that for us. + if (orphans.empty()) + g_debug("nothing to clean up"); + else { + g_debug("removing up %zu stale message(s) from store", orphans.size()); + store_.remove_messages(orphans); + progress_.removed += orphans.size(); + } return true; } @@ -280,14 +303,11 @@ Indexer::Private::start(const Indexer::Config& conf) cleanup(); g_debug("cleanup finished"); } - - store_.commit(); leave: state_.change_to(IndexState::Idle); }); g_debug("started indexer"); - return true; } diff --git a/lib/index/mu-indexer.hh b/lib/index/mu-indexer.hh index 1d6f4c49..d79ed7cb 100644 --- a/lib/index/mu-indexer.hh +++ b/lib/index/mu-indexer.hh @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020 Dirk-Jan C. Binnema +** Copyright (C) 2021 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the diff --git a/lib/mu-store.cc b/lib/mu-store.cc index cc776b17..e1b1a372 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -110,8 +110,6 @@ struct Store::Private { mdata_{make_metadata(path)}, contacts_{db().get_metadata(ContactsKey), mdata_.personal_addresses} { - if (!readonly) - begin_transaction(); } Private(const std::string& path, @@ -122,7 +120,6 @@ struct Store::Private { mdata_{init_metadata(conf, path, root_maildir, personal_addresses)}, contacts_{"", mdata_.personal_addresses} { - begin_transaction(); } Private(const std::string& root_maildir, @@ -140,15 +137,14 @@ struct Store::Private { if (!read_only_) { xapian_try([&] { writable_db().set_metadata(ContactsKey, contacts_.serialize()); - commit(); }); + if (in_transaction_) + commit_transaction(); } } std::unique_ptr make_xapian_db(const std::string db_path, XapianOpts opts) try { - in_transaction_ = false; - switch (opts) { case XapianOpts::ReadOnly: return std::make_unique(db_path); case XapianOpts::Open: @@ -184,32 +180,30 @@ struct Store::Private { return dynamic_cast(*db_.get()); } - void dirty() - { - if (++dirtiness_ > mdata_.batch_size) - xapian_try([this] { commit(); }); - } - void begin_transaction() noexcept { + if (mdata_.in_memory) + return; // not supported in the in-memory backend. + g_return_if_fail(!in_transaction_); + g_debug("starting transaction"); xapian_try([this] { writable_db().begin_transaction(); in_transaction_ = true; }); } - void commit() noexcept + void commit_transaction() noexcept { - g_debug("committing %zu modification(s)", dirtiness_); - dirtiness_ = 0; if (mdata_.in_memory) return; // not supported in the in-memory backend. + + g_return_if_fail(in_transaction_); + g_debug("committing modification(s)"); xapian_try([this] { if (in_transaction_) writable_db().commit_transaction(); in_transaction_ = false; - begin_transaction(); }); } @@ -286,9 +280,6 @@ struct Store::Private { std::atomic in_transaction_{}; std::mutex lock_; - size_t dirtiness_{}; - - mutable std::atomic ref_count_{1}; }; static void @@ -433,7 +424,6 @@ Store::add_message(const std::string& path) throw Error{Error::Code::Message, "failed to add message"}; g_debug("added message @ %s; docid = %u", path.c_str(), docid); - priv_->dirty(); return docid; } @@ -447,7 +437,6 @@ Store::update_message(MuMsg* msg, unsigned docid) throw Error{Error::Code::Internal, "failed to update message"}; g_debug("updated message @ %s; docid = %u", mu_msg_get_path(msg), docid); - priv_->dirty(); return true; } @@ -462,7 +451,6 @@ Store::remove_message(const std::string& path) priv_->writable_db().delete_document(term); g_debug("deleted message @ %s from store", path.c_str()); - priv_->dirty(); return true; }, @@ -472,13 +460,16 @@ Store::remove_message(const std::string& path) void Store::remove_messages(const std::vector& ids) { + begin_transaction(); + xapian_try([&] { LOCKED; for (auto&& id : ids) { priv_->writable_db().delete_document(id); - priv_->dirty(); } }); + + commit_transaction(); } time_t @@ -502,7 +493,6 @@ Store::set_dirstamp(const std::string& path, time_t tstamp) const std::size_t len = g_snprintf(data.data(), data.size(), "%zx", (size_t)tstamp); priv_->writable_db().set_metadata(path, std::string{data.data(), len}); - priv_->dirty(); } MuMsg* @@ -595,11 +585,22 @@ Store::for_each_term(const std::string& field, Store::ForEachTermFunc func) cons } void -Store::commit() +Store::begin_transaction() { xapian_try([&] { LOCKED; - priv_->commit(); + if (!priv_->in_transaction_) + priv_->begin_transaction(); + }); +} + +void +Store::commit_transaction() +{ + xapian_try([&] { + LOCKED; + if (priv_->in_transaction_) + priv_->commit_transaction(); }); } diff --git a/lib/mu-store.hh b/lib/mu-store.hh index 8b8a413c..83b22447 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -279,11 +279,17 @@ class Store { bool empty() const; /** - * Commit the current group of modifications (i.e., transaction) to disk; - * This rarely needs to be called explicitly, as Store will take care of - * it. + * Start a Xapian transaction, opportunistically. If a transaction + * is already underway, do nothing. */ - void commit(); + void begin_transaction(); + + /** + * Commit the current group of modifications (i.e., transaction) to + * disk, opportunistically. If no transaction is underway, do + * nothing. + */ + void commit_transaction(); /** * Get a reference to the private data. For internal use.