mirror of https://github.com/djcb/mu.git
mu-store/indexer: consume messages from workers
Add store::consume_message, which is like add message but std::move from the caller such that the messages longer has copies (with Xapian::Document) on the caller side; this is to avoid threading issues.
This commit is contained in:
parent
ceed832cb9
commit
aeb6d44172
|
@ -241,19 +241,17 @@ Indexer::Private::add_message(const std::string& path)
|
|||
* The reason for having the lock is some helgrind warnings;
|
||||
* but it believed those are _false alarms_
|
||||
* https://gitlab.gnome.org/GNOME/glib/-/issues/2662
|
||||
*
|
||||
* For now, set the lock as we were seeing some db corruption.
|
||||
*/
|
||||
std::unique_lock lock{w_lock_};
|
||||
//std::unique_lock lock{w_lock_};
|
||||
auto msg{Message::make_from_path(path, store_.message_options())};
|
||||
if (!msg) {
|
||||
mu_warning("failed to create message from {}: {}", path, msg.error().what());
|
||||
return false;
|
||||
}
|
||||
// if the store was empty, we know that the message is completely new
|
||||
// and can use the fast path (Xapians 'add_document' rather tahn
|
||||
// and can use the fast path (Xapians 'add_document' rather than
|
||||
// 'replace_document)
|
||||
auto res = store_.add_message(msg.value(), was_empty_);
|
||||
auto res = store_.consume_message(std::move(msg.value()), was_empty_);
|
||||
if (!res) {
|
||||
mu_warning("failed to add message @ {}: {}", path, res.error().what());
|
||||
return false;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
** Copyright (C) 2021-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||
** Copyright (C) 2021-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||
**
|
||||
** This program is free software; you can redistribute it and/or modify it
|
||||
** under the terms of the GNU General Public License as published by the
|
||||
|
|
|
@ -204,8 +204,22 @@ public:
|
|||
*
|
||||
* @return the doc id of the added message or an error.
|
||||
*/
|
||||
Result<Id> add_message(Message& msg, bool is_new = false);
|
||||
Result<Id> add_message(const std::string& path, bool is_new = false);
|
||||
Result<Id> add_message(Message &msg, bool is_new = false);
|
||||
Result<Id> add_message(const std::string &path, bool is_new = false);
|
||||
|
||||
/**
|
||||
* Like add_message(), however, this consumes the message and disposes
|
||||
* of it when the function ends. This can be useful when injecting
|
||||
* messages from a worker thread, to ensure no Xapian::Documents
|
||||
* live in different threads.
|
||||
*
|
||||
* @param msg a message
|
||||
* @param is_new whether this is a completely new message
|
||||
*/
|
||||
Result<Id> consume_message(Message&& msg, bool is_new = false) {
|
||||
Message consumed{std::move(msg)};
|
||||
return add_message(consumed, is_new);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a message from the store. It will _not_ remove the message
|
||||
|
|
Loading…
Reference in New Issue