diff --git a/lib/index/mu-indexer.cc b/lib/index/mu-indexer.cc index ed1c32fe..2a8f83c9 100644 --- a/lib/index/mu-indexer.cc +++ b/lib/index/mu-indexer.cc @@ -70,13 +70,12 @@ struct Indexer::Private { [this](auto&& path, auto&& statbuf, auto&& info) { return handler(path, statbuf, info); }}, - max_message_size_{store_.metadata().max_message_size}, - batch_size_{store_.metadata().batch_size} + max_message_size_{store_.metadata().max_message_size} { g_message("created indexer for %s -> %s (batch-size: %zu)", store.metadata().root_maildir.c_str(), store.metadata().database_path.c_str(), - batch_size_); + store.metadata().batch_size); } ~Private() { stop(); } @@ -104,10 +103,8 @@ struct Indexer::Private { AsyncQueue fq_; - Progress progress_; - IndexState state_; - const size_t batch_size_; /**< Max number of messages added before - * committing */ + Progress progress_; + IndexState state_; std::mutex lock_, wlock_; }; @@ -194,11 +191,6 @@ Indexer::Private::worker() g_debug("started worker"); - /* note that transaction starting/committing is opportunistic, - * and are NOPs if we are already/not in a transaction */ - - store_.begin_transaction(); - while (state_ == IndexState::Scanning || !fq_.empty()) { if (!fq_.pop(item, 250ms)) continue; @@ -209,22 +201,15 @@ Indexer::Private::worker() try { std::unique_lock lock{lock_}; - store_.add_message(item); + store_.add_message(item, true /*use-transaction*/); ++progress_.updated; - if (progress_.updated % batch_size_ == 0) { - store_.commit_transaction(); - store_.begin_transaction(); - } - } catch (const Mu::Error& er) { g_warning("error adding message @ %s: %s", item.c_str(), er.what()); } maybe_start_worker(); } - - store_.commit_transaction(); } bool @@ -246,7 +231,6 @@ Indexer::Private::cleanup() return state_ == IndexState::Cleaning; }); - // No need for transactions here, remove_messages does that for us. if (orphans.empty()) g_debug("nothing to clean up"); else { @@ -295,6 +279,8 @@ Indexer::Private::start(const Indexer::Config& conf) while (!fq_.empty()) std::this_thread::sleep_for(100ms); + store_.commit(); + if (conf_.cleanup) { g_debug("starting cleanup"); state_.change_to(IndexState::Cleaning); diff --git a/lib/mu-store.cc b/lib/mu-store.cc index 4a7c9897..182d2e9e 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -139,8 +139,7 @@ struct Store::Private { xapian_try([&] { writable_db().set_metadata(ContactsKey, contacts_.serialize()); }); - if (in_transaction_) - commit_transaction(); + transaction_maybe_commit(true /*force*/); } } @@ -181,31 +180,33 @@ struct Store::Private { return dynamic_cast(*db_.get()); } - void begin_transaction() noexcept + // If not started yet, start a transaction. Otherwise, just update the transaction size. + void transaction_inc() noexcept { if (mdata_.in_memory) - return; // not supported in the in-memory backend. + return; // not supported - g_return_if_fail(!in_transaction_); - g_debug("starting transaction"); - xapian_try([this] { - writable_db().begin_transaction(); - in_transaction_ = true; - }); + if (transaction_size_ == 0) { + g_debug("starting transaction"); + xapian_try([this] { writable_db().begin_transaction(); }); + } + ++transaction_size_; } - void commit_transaction() noexcept + // Opportunistically commit a transaction if the transaction size + // filled up a batch, or with force. + void transaction_maybe_commit(bool force = false) noexcept { - if (mdata_.in_memory) - return; // not supported in the in-memory backend. + if (mdata_.in_memory || transaction_size_ == 0) + return; // not supported or not in transaction - g_return_if_fail(in_transaction_); - g_debug("committing modification(s)"); - xapian_try([this] { - if (in_transaction_) + if (force || transaction_size_ >= mdata_.batch_size) { + g_debug("committing transaction (n=%zu)", transaction_size_); + xapian_try([this] { writable_db().commit_transaction(); - in_transaction_ = false; - }); + transaction_size_ = 0; + }); + } } void add_synonyms() @@ -279,8 +280,8 @@ struct Store::Private { Contacts contacts_; std::unique_ptr indexer_; - std::atomic in_transaction_{}; - std::mutex lock_; + size_t transaction_size_{}; + std::mutex lock_; }; static void @@ -406,7 +407,7 @@ maildir_from_path(const std::string& root, const std::string& path) } unsigned -Store::add_message(const std::string& path) +Store::add_message(const std::string& path, bool use_transaction) { LOCKED; @@ -418,7 +419,14 @@ Store::add_message(const std::string& path) "failed to create message: %s", gerr ? gerr->message : "something went wrong"}; + if (use_transaction) + priv_->transaction_inc(); + const auto docid{priv_->add_or_update_msg(0, msg)}; + + if (use_transaction) /* commit if batch is full */ + priv_->transaction_maybe_commit(); + mu_msg_unref(msg); if (G_UNLIKELY(docid == InvalidId)) @@ -461,16 +469,17 @@ Store::remove_message(const std::string& path) void Store::remove_messages(const std::vector& ids) { - begin_transaction(); + LOCKED; + + priv_->transaction_inc(); xapian_try([&] { - LOCKED; for (auto&& id : ids) { priv_->writable_db().delete_document(id); } }); - commit_transaction(); + priv_->transaction_maybe_commit(true /*force*/); } time_t @@ -547,6 +556,13 @@ Store::for_each_message_path(Store::ForEachMessageFunc msg_func) const return n; } +void +Store::commit() +{ + LOCKED; + priv_->transaction_maybe_commit(true /*force*/); +} + static MuMsgFieldId field_id(const std::string& field) { @@ -585,26 +601,6 @@ Store::for_each_term(const std::string& field, Store::ForEachTermFunc func) cons return n; } -void -Store::begin_transaction() -{ - xapian_try([&] { - LOCKED; - if (!priv_->in_transaction_) - priv_->begin_transaction(); - }); -} - -void -Store::commit_transaction() -{ - xapian_try([&] { - LOCKED; - if (priv_->in_transaction_) - priv_->commit_transaction(); - }); -} - static void add_terms_values_date(Xapian::Document& doc, MuMsg* msg, MuMsgFieldId mfid) { diff --git a/lib/mu-store.hh b/lib/mu-store.hh index 8fdeed60..2722ec35 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -136,13 +136,16 @@ public: Indexer& indexer(); /** - * Add a message to the store. + * Add a message to the store. When planning to write many messages, + * it's much faster to do so in a transaction. If so, set + * @in_transaction to true. When done with adding messages, call commit(). * * @param path the message path. + * @param whether to bundle up to batch_size changes in a transaction * * @return the doc id of the added message */ - Id add_message(const std::string& path); + Id add_message(const std::string& path, bool use_transaction = false); /** * Update a message in the store. @@ -279,17 +282,10 @@ public: bool empty() const; /** - * Start a Xapian transaction, opportunistically. If a transaction - * is already underway, do nothing. + * Commit the current batch of modifications to disk, opportunistically. + * If no transaction is underway, do nothing. */ - void begin_transaction(); - - /** - * Commit the current group of modifications (i.e., transaction) to - * disk, opportunistically. If no transaction is underway, do - * nothing. - */ - void commit_transaction(); + void commit(); /** * Get a reference to the private data. For internal use.