/* ** Copyright (C) 2020 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the ** Free Software Foundation; either version 3, or (at your option) any ** later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software Foundation, ** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ** */ #include "mu-indexer.hh" #include #include #include #include #include #include #include #include #include using namespace std::chrono_literals; #include "mu-scanner.hh" #include "utils/mu-async-queue.hh" #include "utils/mu-error.hh" #include "../mu-store.hh" using namespace Mu; struct IndexState { enum State { Idle, Scanning, Cleaning }; static const char* name(State s) { switch (s) { case Idle: return "idle"; case Scanning: return "scanning"; case Cleaning: return "cleaning"; default: return ""; } } bool operator==(State rhs) const { return state_ == rhs; } void change_to(State new_state) { g_debug("changing indexer state %s->%s", name((State)state_), name((State)new_state)); state_ = new_state; } private: State state_{Idle}; }; struct Indexer::Private { Private(Mu::Store& store) : store_{store}, scanner_{store_.metadata().root_maildir, [this](auto&& path, auto&& statbuf, auto&& info) { return handler(path, statbuf, info); }}, max_message_size_{store_.metadata().max_message_size}, batch_size_{store_.metadata().batch_size} { g_message("created indexer for %s -> %s (batch-size: %zu)", store.metadata().root_maildir.c_str(), store.metadata().database_path.c_str(), batch_size_); } ~Private() { stop(); } bool dir_predicate(const std::string& path, const struct dirent* dirent) const; bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); void maybe_start_worker(); void worker(); bool cleanup(); bool start(const Indexer::Config& conf); bool stop(); Indexer::Config conf_; Store& store_; Scanner scanner_; const size_t max_message_size_; time_t dirstamp_{}; std::size_t max_workers_; std::vector workers_; std::thread scanner_worker_; AsyncQueue fq_; Progress progress_; IndexState state_; const size_t batch_size_; /**< Max number of messages added before * committing */ std::mutex lock_, wlock_; }; bool Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype) { switch (htype) { case Scanner::HandleType::EnterDir: case Scanner::HandleType::EnterNewCur: { // in lazy-mode, we ignore this dir if its dirstamp suggest it // is up-to-date (this is _not_ always true; hence we call it // lazy-mode); only for actual message dirs, since the dir // tstamps may not bubble up. dirstamp_ = store_.dirstamp(fullpath); if (conf_.lazy_check && dirstamp_ >= statbuf->st_mtime && htype == Scanner::HandleType::EnterNewCur) { g_debug("skip %s (seems up-to-date)", fullpath.c_str()); return false; } // don't index dirs with '.noindex' auto noindex = ::access((fullpath + "/.noindex").c_str(), F_OK) == 0; if (noindex) { g_debug("skip %s (has .noindex)", fullpath.c_str()); return false; // don't descend into this dir. } // don't index dirs with '.noupdate', unless we do a full // (re)index. if (!conf_.ignore_noupdate) { auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0; if (noupdate) { g_debug("skip %s (has .noupdate)", fullpath.c_str()); return false; } } g_debug("process %s", fullpath.c_str()); return true; } case Scanner::HandleType::LeaveDir: { store_.set_dirstamp(fullpath, ::time(NULL)); return true; } case Scanner::HandleType::File: { if ((size_t)statbuf->st_size > max_message_size_) { g_debug("skip %s (too big: %" G_GINT64_FORMAT " bytes)", fullpath.c_str(), (gint64)statbuf->st_size); return false; } // if the message is not in the db yet, or not up-to-date, queue // it for updating/inserting. if (statbuf->st_mtime <= dirstamp_ && store_.contains_message(fullpath)) { // g_debug ("skip %s: already up-to-date"); return false; } fq_.push(std::string{fullpath}); return true; } default: g_return_val_if_reached(false); return false; } } void Indexer::Private::maybe_start_worker() { std::lock_guard wlock{wlock_}; if (fq_.size() > workers_.size() && workers_.size() < max_workers_) workers_.emplace_back(std::thread([this] { worker(); })); } void Indexer::Private::worker() { std::string item; g_debug("started worker"); /* note that transaction starting/committing is opportunistic, * and are NOPs if we are already/not in a transaction */ store_.begin_transaction(); while (state_ == IndexState::Scanning || !fq_.empty()) { if (!fq_.pop(item, 250ms)) continue; // g_debug ("popped (n=%zu) path %s", fq_.size(), item.c_str()); ++progress_.processed; try { std::unique_lock lock{lock_}; store_.add_message(item); ++progress_.updated; if (progress_.updated % batch_size_ == 0) { store_.commit_transaction(); store_.begin_transaction(); } } catch (const Mu::Error& er) { g_warning("error adding message @ %s: %s", item.c_str(), er.what()); } maybe_start_worker(); } store_.commit_transaction(); } bool Indexer::Private::cleanup() { g_debug("starting cleanup"); size_t n{}; std::vector orphans; // store messages without files. store_.for_each_message_path([&](Store::Id id, const std::string& path) { ++n; if (::access(path.c_str(), R_OK) != 0) { g_debug("cannot read %s (id=%u); queueing for removal from store", path.c_str(), id); orphans.emplace_back(id); } return state_ == IndexState::Cleaning; }); // No need for transactions here, remove_messages does that for us. if (orphans.empty()) g_debug("nothing to clean up"); else { g_debug("removing up %zu stale message(s) from store", orphans.size()); store_.remove_messages(orphans); progress_.removed += orphans.size(); } return true; } bool Indexer::Private::start(const Indexer::Config& conf) { stop(); conf_ = conf; if (conf_.max_threads == 0) max_workers_ = std::thread::hardware_concurrency(); else max_workers_ = conf.max_threads; g_debug("starting indexer with <= %zu worker thread(s)", max_workers_); g_debug("indexing: %s; clean-up: %s", conf_.scan ? "yes" : "no", conf_.cleanup ? "yes" : "no"); workers_.emplace_back(std::thread([this] { worker(); })); state_.change_to(IndexState::Scanning); scanner_worker_ = std::thread([this] { progress_ = {}; if (conf_.scan) { g_debug("starting scanner"); if (!scanner_.start()) { // blocks. g_warning("failed to start scanner"); goto leave; } g_debug("scanner finished with %zu file(s) in queue", fq_.size()); } // now there may still be messages in the work queue... // finish those; this is a bit ugly; perhaps we should // handle SIGTERM etc. while (!fq_.empty()) std::this_thread::sleep_for(100ms); if (conf_.cleanup) { g_debug("starting cleanup"); state_.change_to(IndexState::Cleaning); cleanup(); g_debug("cleanup finished"); } leave: state_.change_to(IndexState::Idle); }); g_debug("started indexer"); return true; } bool Indexer::Private::stop() { scanner_.stop(); state_.change_to(IndexState::Idle); const auto w_n = workers_.size(); fq_.clear(); if (scanner_worker_.joinable()) scanner_worker_.join(); for (auto&& w : workers_) if (w.joinable()) w.join(); workers_.clear(); if (w_n > 0) g_debug("stopped indexer (joined %zu worker(s))", w_n); return true; } Indexer::Indexer(Store& store) : priv_{std::make_unique(store)} {} Indexer::~Indexer() = default; bool Indexer::start(const Indexer::Config& conf) { const auto mdir{priv_->store_.metadata().root_maildir}; if (G_UNLIKELY(access(mdir.c_str(), R_OK) != 0)) { g_critical("'%s' is not readable: %s", mdir.c_str(), g_strerror(errno)); return false; } std::lock_guard l(priv_->lock_); if (is_running()) return true; return priv_->start(conf); } bool Indexer::stop() { std::lock_guard l(priv_->lock_); if (!is_running()) return true; g_debug("stopping indexer"); return priv_->stop(); } bool Indexer::is_running() const { return !(priv_->state_ == IndexState::Idle) || !priv_->fq_.empty(); } Indexer::Progress Indexer::progress() const { priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true; return priv_->progress_; }