Couchbase C Client  2.10.7
Asynchronous C Client for Couchbase
example/tracing/views.c

Shows tracing for HTTP APIs of the cluster (e.g. Couchbase Views)

/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2018 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <libcouchbase/api3.h>
#include <libcouchbase/views.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h> /* strlen */
#ifdef _WIN32
#define PRIx64 "I64x"
#define PRId64 "I64d"
#else
#include <inttypes.h>
#endif
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "cJSON.h"
#define COMPONENT_NAME "demo"
struct zipkin_payload;
typedef struct zipkin_payload {
char *data;
struct zipkin_payload *next;
} zipkin_payload;
typedef struct zipkin_state {
char *json_api_host;
char *json_api_port;
/* [0, 100], where 0 is "never", 100 is "always" */
int sample_rate;
zipkin_payload *root;
zipkin_payload *last;
size_t content_length;
} zipkin_state;
void zipkin_destructor(lcbtrace_TRACER *tracer)
{
if (tracer) {
if (tracer->cookie) {
free(tracer->cookie);
tracer->cookie = NULL;
}
free(tracer);
}
}
void zipkin_report(lcbtrace_TRACER *tracer, lcbtrace_SPAN *span)
{
zipkin_state *state = NULL;
if (tracer == NULL) {
return;
}
state = tracer->cookie;
if (state == NULL) {
return;
}
if (rand() % 100 > state->sample_rate) {
return;
}
{
#define BUFSZ 1000
size_t nbuf = BUFSZ;
char *buf;
lcbtrace_SPAN *parent;
uint64_t start;
zipkin_payload *payload = calloc(1, sizeof(zipkin_payload));
cJSON *json = cJSON_CreateObject();
buf = calloc(nbuf, sizeof(char));
cJSON_AddItemToObject(json, "name", cJSON_CreateString(lcbtrace_span_get_operation(span)));
snprintf(buf, nbuf, "%" PRIx64, lcbtrace_span_get_span_id(span));
cJSON_AddItemToObject(json, "id", cJSON_CreateString(buf));
snprintf(buf, nbuf, "%" PRIx64, lcbtrace_span_get_trace_id(span));
cJSON_AddItemToObject(json, "traceId", cJSON_CreateString(buf));
parent = lcbtrace_span_get_parent(span);
if (parent) {
snprintf(buf, nbuf, "%" PRIx64, lcbtrace_span_get_trace_id(parent));
cJSON_AddItemToObject(json, "parentId", cJSON_CreateString(buf));
}
cJSON_AddItemToObject(json, "timestamp", cJSON_CreateNumber(start));
cJSON_AddItemToObject(json, "duration", cJSON_CreateNumber(lcbtrace_span_get_finish_ts(span) - start));
{
cJSON *endpoint = cJSON_CreateObject();
nbuf = BUFSZ;
if (lcbtrace_span_get_tag_str(span, LCBTRACE_TAG_DB_TYPE, &buf, &nbuf) == LCB_SUCCESS) {
buf[nbuf] = '\0';
cJSON_AddItemToObject(endpoint, "serviceName", cJSON_CreateString(buf));
}
cJSON_AddItemToObject(json, "localEndpoint", endpoint);
}
{
cJSON *tags = cJSON_CreateObject();
uint64_t latency, operation_id;
cJSON_AddItemToObject(tags, LCBTRACE_TAG_PEER_LATENCY, cJSON_CreateNumber(latency));
}
cJSON_AddItemToObject(tags, LCBTRACE_TAG_OPERATION_ID, cJSON_CreateNumber(operation_id));
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
cJSON_AddItemToObject(tags, LCBTRACE_TAG_COMPONENT, cJSON_CreateString(buf));
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
cJSON_AddItemToObject(tags, LCBTRACE_TAG_PEER_ADDRESS, cJSON_CreateString(buf));
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
cJSON_AddItemToObject(tags, LCBTRACE_TAG_LOCAL_ADDRESS, cJSON_CreateString(buf));
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
cJSON_AddItemToObject(tags, LCBTRACE_TAG_DB_INSTANCE, cJSON_CreateString(buf));
}
if (cJSON_GetArraySize(tags) > 0) {
cJSON_AddItemToObject(json, "tags", tags);
} else {
cJSON_Delete(tags);
}
}
free(buf);
payload->data = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
if (state->last) {
state->last->next = payload;
}
state->last = payload;
state->content_length += strlen(payload->data) + 1; /* for comma/closing bracket */
if (state->root == NULL) {
state->root = payload;
}
}
}
void loop_send(int sock, char *bytes, ssize_t nbytes)
{
do {
ssize_t rv = send(sock, bytes, nbytes, 0);
if (rv < 0) {
perror("failed to send data to zipkin: ");
exit(EXIT_FAILURE);
} else if (rv < nbytes) {
nbytes -= rv;
bytes += rv;
continue;
}
break;
} while (1);
}
void zipkin_flush(lcbtrace_TRACER *tracer)
{
zipkin_state *state = NULL;
int sock, rv;
if (tracer == NULL) {
return;
}
state = tracer->cookie;
if (state == NULL) {
return;
}
if (state->root == NULL || state->content_length == 0) {
return;
}
{
struct addrinfo hints, *addr, *a;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
rv = getaddrinfo(state->json_api_host, state->json_api_port, &hints, &addr);
if (rv != 0) {
fprintf(stderr, "failed to resolve zipkin address getaddrinfo: %s\n", gai_strerror(rv));
exit(EXIT_FAILURE);
}
for (a = addr; a != NULL; a = a->ai_next) {
sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
if (sock == -1) {
perror("failed to create socket for zipkin: ");
continue;
}
rv = connect(sock, a->ai_addr, a->ai_addrlen);
if (rv == -1) {
perror("failed to connect socket for zipkin: ");
continue;
}
break;
}
if (a == NULL) {
fprintf(stderr, "unable to connect to zipkin. terminating\n");
exit(EXIT_FAILURE);
}
freeaddrinfo(addr);
}
{
char preamble[1000] = "";
size_t size;
snprintf(preamble, sizeof(preamble),
"POST /api/v2/spans HTTP/1.1\r\n"
"Content-Type: application/json\r\n"
"Accept: */*\r\n"
"Connection: close\r\n"
"Host: %s:%s\r\n"
"Content-Length: %ld\r\n\r\n",
state->json_api_host, state->json_api_port, (long)state->content_length + 1 /* for open bracket */);
size = strlen(preamble);
rv = send(sock, preamble, size, 0);
if (rv == -1) {
perror("failed to send HTTP headers to zipkin: ");
exit(EXIT_FAILURE);
}
}
{
zipkin_payload *ptr = state->root;
loop_send(sock, "[", 1);
while (ptr) {
zipkin_payload *tmp = ptr;
loop_send(sock, ptr->data, strlen(ptr->data));
ptr = ptr->next;
if (ptr) {
loop_send(sock, ",", 1);
}
free(tmp->data);
free(tmp);
}
loop_send(sock, "]", 1);
}
close(sock);
state->root = state->last = NULL;
state->content_length = 0;
}
lcbtrace_TRACER *zipkin_new()
{
lcbtrace_TRACER *tracer = calloc(1, sizeof(lcbtrace_TRACER));
zipkin_state *zipkin = calloc(1, sizeof(zipkin_state));
tracer->destructor = zipkin_destructor;
tracer->flags = 0;
tracer->version = 0;
tracer->v.v0.report = zipkin_report;
zipkin->json_api_host = "localhost";
zipkin->json_api_port = "9411";
zipkin->sample_rate = 100;
zipkin->root = NULL;
zipkin->last = NULL;
zipkin->content_length = 0;
tracer->cookie = zipkin;
return tracer;
}
static void die(lcb_t instance, const char *msg, lcb_error_t err)
{
fprintf(stderr, "%s. Received code 0x%X (%s)\n", msg, err, lcb_strerror(instance, err));
exit(EXIT_FAILURE);
}
static void op_callback(lcb_t instance, int cbtype, const lcb_RESPBASE *rb)
{
fprintf(stderr, "=== %s ===\n", lcb_strcbtype(cbtype));
if (rb->rc == LCB_SUCCESS) {
fprintf(stderr, "KEY: %.*s\n", (int)rb->nkey, rb->key);
fprintf(stderr, "CAS: 0x%" PRIx64 "\n", rb->cas);
if (cbtype == LCB_CALLBACK_GET) {
const lcb_RESPGET *rg = (const lcb_RESPGET *)rb;
fprintf(stderr, "VALUE: %.*s\n", (int)rg->nvalue, rg->value);
fprintf(stderr, "FLAGS: 0x%x\n", rg->itmflags);
}
} else {
die(instance, lcb_strcbtype(cbtype), rb->rc);
}
}
static void view_callback(lcb_t instance, int cbtype, const lcb_RESPVIEWQUERY *rv)
{
if (rv->rflags & LCB_RESP_F_FINAL) {
printf("*** META FROM VIEWS ***\n");
fprintf(stderr, "%.*s\n", (int)rv->nvalue, rv->value);
return;
}
printf("Got row callback from LCB: RC=0x%X, DOCID=%.*s. KEY=%.*s\n", rv->rc, (int)rv->ndocid, rv->docid,
(int)rv->nkey, rv->key);
if (rv->docresp) {
printf(" Document for response. RC=0x%X. CAS=0x%" PRIx64 "\n", rv->docresp->rc, rv->docresp->cas);
}
}
int main(int argc, char *argv[])
{
lcb_t instance;
struct lcb_create_st create_options = {0};
lcb_CMDSTORE scmd = {0};
lcb_CMDGET gcmd = {0};
lcbtrace_SPAN *span = NULL;
lcbtrace_TRACER *tracer = NULL;
create_options.version = 3;
if (argc < 2) {
fprintf(stderr, "Usage: %s couchbase://host/bucket [ password [ username ] ]\n", argv[0]);
exit(EXIT_FAILURE);
}
create_options.v.v3.connstr = argv[1];
if (argc > 2) {
create_options.v.v3.passwd = argv[2];
}
if (argc > 3) {
create_options.v.v3.username = argv[3];
}
srand(time(NULL));
err = lcb_create(&instance, &create_options);
if (err != LCB_SUCCESS) {
die(NULL, "Couldn't create couchbase handle", err);
}
err = lcb_connect(instance);
if (err != LCB_SUCCESS) {
die(instance, "Couldn't schedule connection", err);
}
lcb_wait(instance);
err = lcb_get_bootstrap_status(instance);
if (err != LCB_SUCCESS) {
die(instance, "Couldn't bootstrap from cluster", err);
}
/* Assign the handlers to be called for the operation types */
lcb_install_callback3(instance, LCB_CALLBACK_GET, op_callback);
lcb_install_callback3(instance, LCB_CALLBACK_STORE, op_callback);
tracer = zipkin_new();
lcb_set_tracer(instance, tracer);
span = lcbtrace_span_start(tracer, "transaction", 0, NULL);
{
int encoding_time_us = rand() % 1000;
lcbtrace_SPAN *encoding;
lcbtrace_REF ref;
ref.type = LCBTRACE_REF_CHILD_OF;
ref.span = span;
encoding = lcbtrace_span_start(tracer, LCBTRACE_OP_REQUEST_ENCODING, 0, &ref);
usleep(encoding_time_us);
}
lcb_CMDVIEWQUERY vcmd = {0};
char *doc_name = "beer";
char *view_name = "by_location";
char *options = "reduce=false&limit=3";
vcmd.callback = view_callback;
vcmd.ddoc = doc_name;
vcmd.nddoc = strlen(doc_name);
vcmd.view = view_name;
vcmd.nview = strlen(view_name);
vcmd.optstr = options;
vcmd.noptstr = strlen(options);
vcmd.handle = &handle;
err = lcb_view_query(instance, NULL, &vcmd);
if (err != LCB_SUCCESS) {
die(instance, "Couldn't schedule view operation", err);
}
lcb_view_set_parent_span(instance, handle, span);
/* The store_callback is invoked from lcb_wait() */
fprintf(stderr, "Will wait for view operation to complete..\n");
lcb_wait(instance);
zipkin_flush(tracer);
/* Now that we're all done, close down the connection handle */
lcb_destroy(instance);
return 0;
}
lcb_CMDGET
Command for retrieving a single item.
Definition: couchbase.h:840
LCBTRACE_TAG_OPERATION_ID
#define LCBTRACE_TAG_OPERATION_ID
The unique ID of the operation.
Definition: tracing.h:243
lcb_CMDVIEWQUERY::nview
size_t nview
Length of the view name.
Definition: views.h:85
lcbtrace_span_get_span_id
lcb_U64 lcbtrace_span_get_span_id(lcbtrace_SPAN *span)
Get ID of the span.
lcbtrace_span_get_tag_uint64
lcb_error_t lcbtrace_span_get_tag_uint64(lcbtrace_SPAN *span, const char *name, lcb_U64 *value)
Get value of the integer tag of the span.
lcb_wait
lcb_error_t lcb_wait(lcb_t instance)
Wait for the execution of all batched requests.
lcb_strerror
const char * lcb_strerror(lcb_t instance, lcb_error_t error)
Get a textual descrtiption for the given error code.
lcb_RESPBASE::nkey
lcb_SIZE nkey
Size of key.
Definition: couchbase.h:626
lcbtrace_span_get_operation
const char * lcbtrace_span_get_operation(lcbtrace_SPAN *span)
Get operation code of the span.
lcb_RESPGET::value
const void * value
Value buffer for the item.
Definition: couchbase.h:854
lcbtrace_TRACER
Tracer interface.
Definition: tracing.h:57
lcb_view_query
lcb_error_t lcb_view_query(lcb_t instance, const void *cookie, const lcb_CMDVIEWQUERY *cmd)
lcb_create_st3::passwd
const char * passwd
Password for bucket.
Definition: couchbase.h:293
LCB_CMDVIEWQUERY_F_INCLUDE_DOCS
#define LCB_CMDVIEWQUERY_F_INCLUDE_DOCS
Set this flag to execute an actual get with each response.
Definition: views.h:63
lcbtrace_TRACER::destructor
void(* destructor)(struct lcbtrace_TRACER *tracer)
destructor function or NULL, if it is not necessary
Definition: tracing.h:61
lcb_get_bootstrap_status
lcb_error_t lcb_get_bootstrap_status(lcb_t instance)
Gets the initial bootstrap status.
lcb_CMDVIEWQUERY::ddoc
const char * ddoc
The design document as a string; e.g.
Definition: views.h:78
lcb_create_st3::connstr
const char * connstr
Connection string.
Definition: couchbase.h:282
lcb_connect
lcb_error_t lcb_connect(lcb_t instance)
Schedule the initial connection This function will schedule the initial connection for the handle.
lcb_create_st3::username
const char * username
Username to use for authentication.
Definition: couchbase.h:288
lcb_VIEWHANDLE
struct lcbview_REQUEST_st * lcb_VIEWHANDLE
Pointer for request instance.
Definition: views.h:43
lcb_CMDVIEWQUERY::handle
lcb_VIEWHANDLE * handle
If not NULL, this will be set to a handle which may be passed to lcb_view_cancel().
Definition: views.h:123
lcb_RESPGET::nvalue
lcb_SIZE nvalue
Length of value.
Definition: couchbase.h:855
LCBTRACE_TAG_LOCAL_ADDRESS
#define LCBTRACE_TAG_LOCAL_ADDRESS
The local socket hostname / IP and port, in the format: {hostname}:{port} To be added to dispatch spa...
Definition: tracing.h:260
lcbtrace_span_get_start_ts
lcb_U64 lcbtrace_span_get_start_ts(lcbtrace_SPAN *span)
Get start timestamp of the span.
lcb_CMDVIEWQUERY::view
const char * view
The name of the view as a string; e.g.
Definition: views.h:83
lcb_error_t
lcb_error_t
Error codes returned by the library.
Definition: error.h:476
LCB_CALLBACK_STORE
@ LCB_CALLBACK_STORE
lcb_store3()
Definition: couchbase.h:698
lcbtrace_span_get_parent
lcbtrace_SPAN * lcbtrace_span_get_parent(lcbtrace_SPAN *span)
Get parent span of the span.
lcb_CMDVIEWQUERY::optstr
const char * optstr
Any URL parameters to be passed to the view should be specified here.
Definition: views.h:94
lcb_RESPBASE::cas
lcb_CAS cas
CAS for response (if applicable)
Definition: couchbase.h:626
lcbtrace_span_get_tag_str
lcb_error_t lcbtrace_span_get_tag_str(lcbtrace_SPAN *span, const char *name, char **value, size_t *nvalue)
Get value of the string tag of the span.
lcb_RESPBASE::rc
lcb_error_t rc
Status code.
Definition: couchbase.h:626
lcb_RESPGET::itmflags
lcb_U32 itmflags
User-defined flags for the item.
Definition: couchbase.h:858
lcb_set_tracer
void lcb_set_tracer(lcb_t instance, lcbtrace_TRACER *tracer)
Set current tracer for the connection.
lcb_create_st::lcb_CRST_u::v3
struct lcb_create_st3 v3
Use this field.
Definition: couchbase.h:338
LCBTRACE_TAG_PEER_ADDRESS
#define LCBTRACE_TAG_PEER_ADDRESS
The remote socket hostname / IP and port, in the format: {hostname}:{port} To be added to dispatch sp...
Definition: tracing.h:265
LCBTRACE_TAG_COMPONENT
#define LCBTRACE_TAG_COMPONENT
The client's identifier string (the 'u' property in the updated HELLO request), the same one that is ...
Definition: tracing.h:239
lcb_strcbtype
const char * lcb_strcbtype(int cbtype)
Returns the type of the callback as a string.
lcb_create_st
Wrapper structure for lcb_create()
Definition: couchbase.h:328
lcbtrace_span_start
lcbtrace_SPAN * lcbtrace_span_start(lcbtrace_TRACER *tracer, const char *operation, lcb_U64 now, lcbtrace_REF *ref)
Start span.
lcb_CMDVIEWQUERY::noptstr
size_t noptstr
Length of the option string.
Definition: views.h:97
lcb_t
struct lcb_st * lcb_t
Definition: couchbase.h:41
lcbtrace_TRACER::flags
lcb_U64 flags
tracer-specific flags
Definition: tracing.h:59
LCB_CALLBACK_GET
@ LCB_CALLBACK_GET
lcb_get3()
Definition: couchbase.h:697
lcb_CMDVIEWQUERY::cmdflags
lcb_U32 cmdflags
Common command flags; e.g.
Definition: views.h:75
lcb_CMDVIEWQUERY::callback
lcb_VIEWQUERYCALLBACK callback
Callback to invoke for each row.
Definition: views.h:119
lcb_RESPGET
Response structure when retrieving a single item.
Definition: couchbase.h:852
lcb_CMDVIEWQUERY
Command structure for querying a view.
Definition: views.h:73
lcb_view_set_parent_span
void lcb_view_set_parent_span(lcb_t instance, lcb_VIEWHANDLE handle, lcbtrace_SPAN *span)
Associate parent tracing span with the View request.
lcb_create
lcb_error_t lcb_create(lcb_t *instance, const struct lcb_create_st *options)
Create an instance of lcb.
couchbase.h
LCB_SUCCESS
@ LCB_SUCCESS
Success.
Definition: error.h:478
lcbtrace_span_get_trace_id
lcb_U64 lcbtrace_span_get_trace_id(lcbtrace_SPAN *span)
Get trace ID of the span.
lcbtrace_span_get_finish_ts
lcb_U64 lcbtrace_span_get_finish_ts(lcbtrace_SPAN *span)
Get finish timestamp of the span.
lcb_RESPBASE
Base response structure for callbacks.
Definition: couchbase.h:625
lcb_destroy
void lcb_destroy(lcb_t instance)
Destroy (and release all allocated resources) an instance of lcb.
lcbtrace_TRACER::cookie
void * cookie
opaque pointer (e.g.
Definition: tracing.h:60
LCBTRACE_TAG_PEER_LATENCY
#define LCBTRACE_TAG_PEER_LATENCY
The server duration with precision suffix.
Definition: tracing.h:272
lcb_CMDVIEWQUERY::nddoc
size_t nddoc
Length of design document name.
Definition: views.h:80
lcbtrace_TRACER::version
lcb_U16 version
version of the structure, current value is 0
Definition: tracing.h:58
lcbtrace_span_add_tag_str
void lcbtrace_span_add_tag_str(lcbtrace_SPAN *span, const char *name, const char *value)
Add string tag to span.
lcb_create_st::version
int version
Indicates which field in the lcb_CRST_u union should be used.
Definition: couchbase.h:330
LCBTRACE_NOW
#define LCBTRACE_NOW
zero means the library will trigger timestamp automatically
Definition: tracing.h:120
LCB_RESP_F_FINAL
@ LCB_RESP_F_FINAL
No more responses are to be received for this request.
Definition: couchbase.h:658
lcbtrace_span_finish
void lcbtrace_span_finish(lcbtrace_SPAN *span, lcb_U64 now)
Mark the span as finished.
lcb_install_callback3
lcb_RESPCALLBACK lcb_install_callback3(lcb_t instance, int cbtype, lcb_RESPCALLBACK cb)
lcb_CMDSTORE
Command for storing an item to the server.
Definition: couchbase.h:1090
lcb_RESPBASE::key
const void * key
Key for request.
Definition: couchbase.h:626
LCBTRACE_TAG_DB_INSTANCE
#define LCBTRACE_TAG_DB_INSTANCE
Bucket name.
Definition: tracing.h:234