The representation of the document in context of distributed transaction.
By default transactions operate on JSON documents, that is a native encoding for Couchbase, but it is possible to use any other type, as long as its transcoder can encode it into bytestring.
The following example shows how to use custom type with custom encoding in context of the transactions.
We start with defining the type and its transcoder.
struct ledger_entry {
std::string description{};
std::string account{};
std::uint64_t debit{};
std::uint64_t credit{};
};
class ledger
{
public:
void add_record(const std::string& date,
const std::string& from_account,
const std::string& to_account,
std::uint64_t amount,
const std::string& description)
{
entries_.push_back({
description,
to_account,
amount,
0,
});
entries_.push_back({
description,
from_account,
0,
amount,
});
}
[[nodiscard]] auto entries() const -> const std::vector<ledger_entry>&
{
return entries_;
}
[[nodiscard]] auto to_csv() const -> std::vector<std::byte>
{
std::vector<std::byte> buffer;
byte_appender output(buffer);
fmt::format_to(output, "Date,Description,Account,Debit,Credit\n");
for (const auto& entry : entries_) {
fmt::format_to(output,
"{},{},{},{},{}\n",
entry.date,
entry.description,
entry.account,
entry.debit,
entry.credit);
}
return buffer;
}
static auto from_csv(const std::vector<std::byte>& blob) -> ledger
{
ledger ret;
std::istringstream input(std::string{
reinterpret_cast<const char*>(blob.data()),
blob.size(),
});
std::string line;
bool header_line{ true };
while (std::getline(input, line)) {
if (header_line) {
header_line = false;
continue;
}
std::istringstream line_stream(line);
ledger_entry entry;
std::getline(line_stream, entry.date, ',');
std::getline(line_stream, entry.description, ',');
std::getline(line_stream, entry.account, ',');
std::string field;
std::getline(line_stream, field, ',');
if (!field.empty()) {
entry.debit = std::stoul(field);
}
std::getline(line_stream, field, ',');
if (!field.empty()) {
entry.credit = std::stoul(field);
}
ret.entries_.push_back(entry);
}
return ret;
}
private:
std::vector<ledger_entry> entries_{};
};
struct csv_transcoder {
using document_type = ledger;
template<typename Document = document_type>
{
return {
};
}
template<typename Document = document_type>
{
if (encoded.flags == 0 &&
throw std::system_error(
"csv_transcoder expects document to have binary common flags, flags=" +
std::to_string(encoded.flags));
}
return Document::from_csv(encoded.data);
}
};
template<>
};
Then populate initial state of the system.
ledger initial_state;
initial_state.add_record("2024-08-30", "Accounts Receivable", "Cash", 1500, "Payment received");
auto [err, res] = collection.upsert<csv_transcoder, ledger>("the_ledger", initial_state).get();
if (err.ec()) {
fmt::print(
stderr,
"Create initial state of \"the_ledger\" has failed before starting transaction: {}\n",
err.ec().message());
return 1;
}
Now the actual transactional mutation of the document.
auto [tx_err, tx_res] = cluster.transactions()->run(
[=](std::shared_ptr<couchbase::transactions::attempt_context> ctx) ->
couchbase::error {
auto [err_ctx, doc] = ctx->get(collection, "the_ledger");
if (err_ctx.ec()) {
fmt::print(stderr, "Failed to retrieve \"the_ledger\": {}\n", err_ctx.ec().message());
return {};
}
auto the_ledger = doc.content_as<ledger, csv_transcoder>();
the_ledger.add_record("2024-09-01", "Cash", "Expenses", 1000, "Rent payment");
ctx->replace<csv_transcoder, ledger>(doc, the_ledger);
return {};
});
if (tx_err.ec()) {
fmt::print(stderr,
"error in transaction {}, cause: {}\n",
tx_err.ec().message(),
tx_err.cause().has_value() ? tx_err.cause().value().ec().message() : "");
retval = 1;
} else {
fmt::print("transaction {} completed successfully\n", tx_res.transaction_id);
}