#include <fmt/chrono.h>
#include <fmt/format.h>
#include <tao/json.hpp>
#include <system_error>
#include <tao/json/from_string.hpp>
#include <thread>
static constexpr auto connection_string{ "couchbase://127.0.0.1" };
static constexpr auto username{ "Administrator" };
static constexpr auto password{ "password" };
static constexpr auto bucket_name{ "default" };
{
public:
:
couchbase::best_effort_retry_strategy(calculator)
, calculator_{ calculator }
{
}
{
auto backoff_duration = calculator_(request.retry_attempts());
fmt::print("retrying in {} because of \"key_value_locked\", attempt {}\n",
backoff_duration,
request.retry_attempts());
}
}
private:
};
class couchbase_mutex
{
public:
static constexpr std::chrono::seconds default_lock_duration{ 15 };
static constexpr std::chrono::seconds default_timeout{ 10 };
std::string document_id,
std::chrono::seconds lock_duration = default_lock_duration,
std::chrono::seconds timeout = default_timeout)
: collection_{ std::move(collection) }
, document_id_{ std::move(document_id) }
, lock_duration_{ lock_duration }
{
auto [err, resp] = collection_.upsert(document_id_, content_, options).get();
std::size_t retry_attempts = retry_attemps_from_context(err.ctx().to_json());
if (err.ec()) {
throw std::system_error(
err.ec(),
fmt::format(R"(unable to create mutex "{}" (retries: {}))", document_id_, retry_attempts));
}
cas_ = resp.cas();
fmt::print("[created ] \"{}\", cas: {}, retries: {}, lock_duration: {}\n",
document_id_,
cas_.value(),
retry_attempts,
lock_duration);
}
void lock()
{
std::scoped_lock lock(mutex_);
auto options =
auto [err, resp] = collection_.get_and_lock(document_id_, lock_duration_, options).get();
std::size_t retry_attempts = retry_attemps_from_context(err.ctx().to_json());
if (err) {
throw std::system_error(
err.ec(),
fmt::format(R"(unable to lock mutex "{}" (retries: {}))", document_id_, retry_attempts));
}
cas_ = resp.cas();
fmt::print(
"[locked ] \"{}\", cas: {}, retries: {}\n", document_id_, cas_.value(), retry_attempts);
}
void unlock()
{
std::scoped_lock lock(mutex_);
auto err = collection_.unlock(document_id_, cas_, options).get();
std::size_t retry_attempts = retry_attemps_from_context(err.ctx().to_json());
if (err) {
throw std::system_error(
err.ec(),
fmt::format(R"(unable to unlock mutex "{}" (retries: {}))", document_id_, retry_attempts));
}
fmt::print(
"[unlocked] \"{}\", cas: {}, retries: {}\n", document_id_, cas_.value(), retry_attempts);
}
private:
static auto retry_attemps_from_context(const std::string& context_json) -> std::size_t
{
auto ctx = tao::json::from_string(context_json);
if (const auto* attempts = ctx.find("retry_attempts"); attempts != nullptr) {
return attempts->get_unsigned();
}
return 0;
}
std::string document_id_;
std::chrono::seconds lock_duration_;
std::chrono::seconds timeout_;
const std::string content_{ "__couchbase_mutex__" };
mutable std::mutex mutex_{};
std::shared_ptr<lock_aware_retry_strategy> retry_strategy_{
};
};
int
main()
{
options.apply_profile("wan_development");
auto collection = cluster.
bucket(bucket_name).scope(scope_name).collection(collection_name);
auto writer_id =
fmt::format("thread:{}", std::hash<std::thread::id>()(std::this_thread::get_id()));
couchbase_mutex mutex(collection, "demo_mutex");
{
std::scoped_lock lock(mutex);
const std::string document_id{ "order:42" };
const tao::json::value basic_doc{ { "type", "book" },
{ "name", "Alice in Wonderland" },
{ "author", "Lewis Carroll" },
{ "price_usd", 4.0 },
{ "writer_id", writer_id } };
auto [err, resp] = collection.
upsert(document_id, basic_doc, {}).get();
fmt::print("[stored ] \"{}\", ec: {}, id: \"{}\", CAS: {}, writer_id: \"{}\"\n",
document_id,
err.ec() ? err.ec().message() : "success",
document_id,
resp.cas().value(),
writer_id);
fmt::print(
stderr,
"[wait ] pretend to do some work for 7 seconds (distributed mutex still locked)\n");
std::this_thread::sleep_for(std::chrono::seconds{ 7 });
}
cluster.close().get();
return 0;
}
Definition best_effort_retry_strategy.hxx:50
auto retry_after(const retry_request &request, retry_reason reason) -> retry_action override
CAS is a special type that represented in protocol using unsigned 64-bit integer, but only equality c...
Definition cas.hxx:34
Definition cluster_options.hxx:44
static void connect(const std::string &connection_string, const cluster_options &options, cluster_connect_handler &&handler)
Connect to a Couchbase cluster.
The collection provides access to all collection APIs.
Definition collection.hxx:70
friend class bucket
Definition collection.hxx:1083
static constexpr auto default_name
Constant for the name of the default collection in the bucket.
Definition collection.hxx:78
void upsert(std::string document_id, codec::encoded_value document, const upsert_options &options, upsert_handler &&handler) const
Upserts an encoded body of the document which might or might not exist yet, with custom options.
auto retry_strategy(const std::shared_ptr< retry_strategy > strategy) -> derived_class &
Specifies a custom couchbase::retry_strategy for this operation.
Definition common_options.hxx:66
auto timeout(const std::chrono::milliseconds timeout) -> derived_class &
Specifies a custom per-operation timeout.
Definition common_options.hxx:51
Definition retry_action.hxx:25
Definition retry_request.hxx:28
static constexpr auto default_name
Constant for the name of the default scope in the bucket.
Definition scope.hxx:55
Represents a single item from the result of collection::scan()
Definition allow_querying_search_index_options.hxx:28
auto controlled_backoff(std::size_t retry_attempts) -> std::chrono::milliseconds
calculates a backoff time duration from the retry attempts on a given request.
retry_reason
Enumeration of possible retry reasons for operations.
Definition retry_reason.hxx:28
std::function< std::chrono::milliseconds(std::size_t retry_attempts)> backoff_calculator
Definition best_effort_retry_strategy.hxx:28
Options for collection::get_and_lock().
Definition get_and_lock_options.hxx:37
Options for collection::unlock().
Definition unlock_options.hxx:40
Options for collection::upsert().
Definition upsert_options.hxx:41