server: refactor allow-for-temp-file handling

Add a helper OutputStream class so both "normal" and temp-file code can
be handled uniformly.
This commit is contained in:
Dirk-Jan C. Binnema 2023-08-15 19:23:45 +03:00
parent c554e5194a
commit 1a1eb1f906
1 changed files with 115 additions and 99 deletions

View File

@ -23,11 +23,13 @@
#include "mu-server.hh" #include "mu-server.hh"
#include <fstream> #include <fstream>
#include <sstream>
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <variant>
#include <functional> #include <functional>
#include <cstring> #include <cstring>
@ -49,6 +51,77 @@
using namespace Mu; using namespace Mu;
/// output stream to _either_ a file or to a stringstream
struct OutputStream {
/**
* Construct an OutputStream for a tempfile
*
* @param tmp_dir dir for temp files
*/
OutputStream(const std::string& tmp_dir):
fname_{join_paths(tmp_dir,
mu_format("mu-{}.eld", g_get_monotonic_time()))},
out_{std::ofstream{fname_}} {
if (!out().good())
throw Mu::Error{Error::Code::File, "failed to create temp-file"};
}
/**
* Construct an OutputStream for a stringstream
*
* @param cdr name of the output (e.g., "contacts")
*
* @return
*/
OutputStream(): out_{std::ostringstream{}} {}
/**
* Get a writable ostream
*
* @return an ostream
*/
std::ostream& out() {
if (std::holds_alternative<std::ofstream>(out_))
return std::get<std::ofstream>(out_);
else
return std::get<std::ostringstream>(out_);
}
/// conversion
operator std::ostream&() { return out(); }
/**
* Get the output as a string, either something like, either a lisp form
* or a the full path to a temp file containing the same.
*
* @return lisp form or path
*/
std::string to_string() const {
return std::holds_alternative<std::ostringstream>(out_) ?
std::get<std::ostringstream>(out_).str() :
quote(fname_);
}
/**
* Delete file, if any. Only do this when the OutputStream is no
* longer needed.
*/
void unlink () {
if (!fname_.empty())
return;
if (auto&&res{::unlink(fname_.c_str())}; res != 0)
mu_warning("failed to unlink '{}'", ::strerror(res));
}
private:
std::string fname_;
using OutType = std::variant<std::ofstream, std::ostringstream>;
OutType out_;
bool do_unlink_{};
};
/// @brief object to manage the server-context for all commands. /// @brief object to manage the server-context for all commands.
struct Server::Private { struct Server::Private {
Private(Store& store, const Server::Options& opts, Output output) Private(Store& store, const Server::Options& opts, Output output)
@ -93,7 +166,6 @@ struct Server::Private {
} }
size_t output_results(const QueryResults& qres, size_t batch_size) const; size_t output_results(const QueryResults& qres, size_t batch_size) const;
size_t output_results_temp_file(const QueryResults& qres, size_t batch_size) const;
// //
// handlers for various commands. // handlers for various commands.
@ -127,6 +199,13 @@ private:
void view_mark_as_read(Store::Id docid, Message&& msg, bool rename); void view_mark_as_read(Store::Id docid, Message&& msg, bool rename);
OutputStream make_output_stream() const {
if (options_.allow_temp_file)
return OutputStream{tmp_dir_};
else
return OutputStream{};
}
std::ofstream make_temp_file_stream(std::string& fname) const; std::ofstream make_temp_file_stream(std::string& fname) const;
Store& store_; Store& store_;
@ -413,8 +492,9 @@ Server::Private::invoke(const std::string& expr) noexcept
/* 'add' adds a message to the database, and takes two parameters: 'path', which /* 'add' adds a message to the database, and takes two parameters: 'path', which
* is the full path to the message, and 'maildir', which is the maildir this * is the full path to the message, and 'maildir', which is the maildir this
* message lives in (e.g. "/inbox"). response with an (:info ...) message with * message lives in (e.g. "/inbox").
* information about the newly added message (details: see code below) *
* responds with an (added . <message sexp>) forr the new message
*/ */
void void
Server::Private::add_handler(const Command& cmd) Server::Private::add_handler(const Command& cmd)
@ -523,19 +603,6 @@ Server::Private::compose_handler(const Command& cmd)
output_sexp(comp_lst); output_sexp(comp_lst);
} }
// create tuple of ostream / name
// https://stackoverflow.com/questions/46114214/lambda-implicit-capture-fails-with-variable-declared-from-structured-binding
std::ofstream
Server::Private::make_temp_file_stream(std::string& fname) const
{
fname = join_paths(tmp_dir_, mu_format("mu-{}.eld", g_get_monotonic_time()));
std::ofstream output{fname, std::ios::out};
if (!output.good())
throw Mu::Error{Error::Code::File, "failed to create temp-file"};
return output;
}
void void
Server::Private::contacts_handler(const Command& cmd) Server::Private::contacts_handler(const Command& cmd)
{ {
@ -562,37 +629,18 @@ Server::Private::contacts_handler(const Command& cmd)
}; };
auto n{0}; auto n{0};
if (options_.allow_temp_file) { auto&& out{make_output_stream()};
// fast way: put all the contacts in a temp-file mu_print(out, "(");
// structured bindings / lambda don't work with some clang. store().contacts_cache().for_each([&](const Contact& ci) {
std::string tmp_file_name; if (!match_contact(ci))
auto&& tmp_file = make_temp_file_stream(tmp_file_name); return true; // continue
mu_println(out.out(), "{}", quote(ci.display_name()));
mu_print(tmp_file, "("); ++n;
store().contacts_cache().for_each([&](const Contact& ci) { return maxnum == 0 || n < maxnum;
if (!match_contact(ci)) });
return true; // continue mu_print(out, ")");
mu_println(tmp_file, "{}", quote(ci.display_name())); output(mu_format("(:contacts {}\n:tstamp \"{}\")",
++n; out.to_string(), g_get_monotonic_time()));
return maxnum == 0 || n < maxnum;
});
mu_print(tmp_file, ")");
output_sexp(Sexp{":tstamp"_sym, mu_format("\"{}\"", g_get_monotonic_time()),
":contacts-temp-file"_sym, tmp_file_name});
} else {
Sexp contacts;
store().contacts_cache().for_each([&](const Contact& ci) {
if (!match_contact(ci))
return true; // continue;
contacts.add(ci.display_name());
++n;
return maxnum == 0 || n < maxnum;
});
Sexp seq;
seq.put_props(":contacts", std::move(contacts),
":tstamp", mu_format("\"{}\"", g_get_monotonic_time()));
output_sexp(seq);
}
mu_debug("sent {} of {} contact(s)", n, store().contacts_cache().size()); mu_debug("sent {} of {} contact(s)", n, store().contacts_cache().size());
} }
@ -651,52 +699,13 @@ determine_docids(const Store& store, const Command& cmd)
size_t size_t
Server::Private::output_results(const QueryResults& qres, size_t batch_size) const Server::Private::output_results(const QueryResults& qres, size_t batch_size) const
{
size_t n{};
Sexp headers;
const auto output_batch = [&](Sexp&& hdrs) {
Sexp batch;
batch.put_props(":headers", std::move(hdrs));
output_sexp(batch);
};
for (auto&& mi : qres) {
auto msg{mi.message()};
if (!msg)
continue;
++n;
// construct sexp for a single header.
auto qm{mi.query_match()};
auto msgsexp{unwrap(Sexp::parse(msg_sexp_str(*msg, mi.doc_id(), qm)))};
headers.add(std::move(msgsexp));
// we output up-to-batch-size lists of messages. It's much
// faster (on the emacs side) to handle such batches than single
// headers.
if (headers.size() % batch_size == 0) {
output_batch(std::move(headers));
headers.clear();
};
}
// remaining.
if (!headers.empty())
output_batch(std::move(headers));
return n;
}
size_t
Server::Private::output_results_temp_file(const QueryResults& qres, size_t batch_size) const
{ {
// create an output stream with a file name // create an output stream with a file name
size_t n{}; size_t n{}, batch_n{};
auto&& out{make_output_stream()};
// structured bindings / lambda don't work with some clang. // structured bindings / lambda don't work with some clang.
std::string tmp_file_name;
auto&& tmp_file{make_temp_file_stream(tmp_file_name)};
mu_print(tmp_file, "("); mu_print(out, "(");
for(auto&& mi: qres) { for(auto&& mi: qres) {
auto msg{mi.message()}; auto msg{mi.message()};
@ -704,19 +713,27 @@ Server::Private::output_results_temp_file(const QueryResults& qres, size_t batch
continue; continue;
auto qm{mi.query_match()}; // construct sexp for a single header. auto qm{mi.query_match()}; // construct sexp for a single header.
tmp_file << msg_sexp_str(*msg, mi.doc_id(), qm) << '\n'; mu_println(out, "{}", msg_sexp_str(*msg, mi.doc_id(), qm));
++n; ++n;
++batch_n;
if (n % batch_size == 0) { if (n % batch_size == 0) {
mu_print(")"); // batch complete
batch_size = 1000; mu_print(out, ")");
output_sexp(Sexp{":headers-temp-file"_sym, tmp_file_name}); batch_size = 5000;
tmp_file = make_temp_file_stream(tmp_file_name); output(mu_format("(:headers {})", out.to_string()));
mu_print(tmp_file, "("); batch_n = 0;
// start a new batch
out = make_output_stream();
mu_print(out, "(");
} }
} }
mu_print(tmp_file, ")");
output_sexp(Sexp{":headers-temp-file"_sym, tmp_file_name}); mu_print(out, ")");
if (batch_n > 0)
output(mu_format("(:headers {})", out.to_string()));
else
out.unlink();
return n; return n;
} }
@ -774,8 +791,7 @@ Server::Private::find_handler(const Command& cmd)
* output of two finds will not be mixed. */ * output of two finds will not be mixed. */
output_sexp(Sexp().put_props(":erase", Sexp::t_sym)); output_sexp(Sexp().put_props(":erase", Sexp::t_sym));
const auto bsize{static_cast<size_t>(batch_size)}; const auto bsize{static_cast<size_t>(batch_size)};
const auto foundnum =options_.allow_temp_file ? const auto foundnum = output_results(*qres, bsize);
output_results_temp_file(*qres, bsize) : output_results(*qres, bsize);
output_sexp(Sexp().put_props(":found", foundnum)); output_sexp(Sexp().put_props(":found", foundnum));
} }