Shows tracing for HTTP APIs of the cluster (e.g. Couchbase Views)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#ifdef _WIN32
#define PRIx64 "I64x"
#define PRId64 "I64d"
#else
#include <inttypes.h>
#endif
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "cJSON.h"
#define COMPONENT_NAME "demo"
struct zipkin_payload;
typedef struct zipkin_payload {
char *data;
struct zipkin_payload *next;
} zipkin_payload;
typedef struct zipkin_state {
char *json_api_host;
char *json_api_port;
int sample_rate;
zipkin_payload *root;
zipkin_payload *last;
size_t content_length;
} zipkin_state;
{
if (tracer) {
}
free(tracer);
}
}
{
zipkin_state *state = NULL;
if (tracer == NULL) {
return;
}
if (state == NULL) {
return;
}
if (rand() % 100 > state->sample_rate) {
return;
}
{
#define BUFSZ 1000
size_t nbuf = BUFSZ;
char *buf;
lcbtrace_SPAN *parent;
uint64_t start;
zipkin_payload *payload = calloc(1, sizeof(zipkin_payload));
cJSON *json = cJSON_CreateObject();
buf = calloc(nbuf, sizeof(char));
cJSON_AddItemToObject(json, "id", cJSON_CreateString(buf));
cJSON_AddItemToObject(json, "traceId", cJSON_CreateString(buf));
if (parent) {
cJSON_AddItemToObject(json, "parentId", cJSON_CreateString(buf));
}
cJSON_AddItemToObject(json, "timestamp", cJSON_CreateNumber(start));
{
cJSON *endpoint = cJSON_CreateObject();
nbuf = BUFSZ;
buf[nbuf] = '\0';
cJSON_AddItemToObject(endpoint, "serviceName", cJSON_CreateString(buf));
}
cJSON_AddItemToObject(json, "localEndpoint", endpoint);
}
{
cJSON *tags = cJSON_CreateObject();
uint64_t latency, operation_id;
}
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
}
nbuf = BUFSZ;
buf[nbuf] = '\0';
}
if (cJSON_GetArraySize(tags) > 0) {
cJSON_AddItemToObject(json, "tags", tags);
} else {
cJSON_Delete(tags);
}
}
free(buf);
payload->data = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
if (state->last) {
state->last->next = payload;
}
state->last = payload;
state->content_length += strlen(payload->data) + 1;
if (state->root == NULL) {
state->root = payload;
}
}
}
void loop_send(int sock, char *bytes, ssize_t nbytes)
{
do {
ssize_t rv = send(sock, bytes, nbytes, 0);
if (rv < 0) {
perror("failed to send data to zipkin: ");
exit(EXIT_FAILURE);
} else if (rv < nbytes) {
nbytes -= rv;
bytes += rv;
continue;
}
break;
} while (1);
}
{
zipkin_state *state = NULL;
int sock, rv;
if (tracer == NULL) {
return;
}
if (state == NULL) {
return;
}
if (state->root == NULL || state->content_length == 0) {
return;
}
{
struct addrinfo hints, *addr, *a;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
rv = getaddrinfo(state->json_api_host, state->json_api_port, &hints, &addr);
if (rv != 0) {
fprintf(stderr, "failed to resolve zipkin address getaddrinfo: %s\n", gai_strerror(rv));
exit(EXIT_FAILURE);
}
for (a = addr; a != NULL; a = a->ai_next) {
sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
if (sock == -1) {
perror("failed to create socket for zipkin: ");
continue;
}
rv = connect(sock, a->ai_addr, a->ai_addrlen);
if (rv == -1) {
perror("failed to connect socket for zipkin: ");
continue;
}
break;
}
if (a == NULL) {
fprintf(stderr, "unable to connect to zipkin. terminating\n");
exit(EXIT_FAILURE);
}
freeaddrinfo(addr);
}
{
char preamble[1000] = "";
size_t size;
snprintf(preamble, sizeof(preamble),
"POST /api/v2/spans HTTP/1.1\r\n"
"Content-Type: application/json\r\n"
"Accept: */*\r\n"
"Connection: close\r\n"
"Host: %s:%s\r\n"
"Content-Length: %ld\r\n\r\n",
state->json_api_host, state->json_api_port, (long)state->content_length + 1 );
size = strlen(preamble);
rv = send(sock, preamble, size, 0);
if (rv == -1) {
perror("failed to send HTTP headers to zipkin: ");
exit(EXIT_FAILURE);
}
}
{
zipkin_payload *ptr = state->root;
loop_send(sock, "[", 1);
while (ptr) {
zipkin_payload *tmp = ptr;
loop_send(sock, ptr->data, strlen(ptr->data));
ptr = ptr->next;
if (ptr) {
loop_send(sock, ",", 1);
}
free(tmp->data);
free(tmp);
}
loop_send(sock, "]", 1);
}
close(sock);
state->root = state->last = NULL;
state->content_length = 0;
}
{
zipkin_state *zipkin = calloc(1, sizeof(zipkin_state));
tracer->
v.v0.report = zipkin_report;
zipkin->json_api_host = "localhost";
zipkin->json_api_port = "9411";
zipkin->sample_rate = 100;
zipkin->root = NULL;
zipkin->last = NULL;
zipkin->content_length = 0;
return tracer;
}
{
exit(EXIT_FAILURE);
}
static void view_callback(
lcb_INSTANCE *instance,
int cbtype,
const lcb_RESPVIEW *rv)
{
if (lcb_respview_is_final(rv)) {
const char *row;
size_t nrow;
lcb_respview_row(rv, &row, &nrow);
printf("*** META FROM VIEWS ***\n");
fprintf(stderr, "%.*s\n", (int)nrow, row);
return;
}
const char *key, *docid;
size_t nkey, ndocid;
lcb_respview_key(rv, &key, &nkey);
lcb_respview_doc_id(rv, &docid, &ndocid);
printf("Got row callback from LCB: RC=0x%X, DOCID=%.*s. KEY=%.*s\n", rc, (int)ndocid, docid, (int)nkey, key);
lcb_respview_document(rv, &doc);
if (doc) {
rc = lcb_respget_status(doc);
uint64_t cas;
lcb_respget_cas(doc, &cas);
printf(" Document for response. RC=0x%X. CAS=0x%llx\n", rc, (long long)cas);
}
}
int main(int argc, char *argv[])
{
lcb_CREATEOPTS *create_options = NULL;
lcbtrace_SPAN *span = NULL;
if (argc < 2) {
fprintf(stderr, "Usage: %s couchbase://host/bucket [ password [ username ] ]\n", argv[0]);
exit(EXIT_FAILURE);
}
lcb_createopts_connstr(create_options, argv[1], strlen(argv[1]));
if (argc > 3) {
lcb_createopts_credentials(create_options, argv[3], strlen(argv[3]), argv[2], strlen(argv[2]));
}
srand(time(NULL));
lcb_createopts_destroy(create_options);
if (err != LCB_SUCCESS) {
die(NULL, "Couldn't create couchbase handle", err);
}
if (err != LCB_SUCCESS) {
die(instance, "Couldn't schedule connection", err);
}
if (err != LCB_SUCCESS) {
die(instance, "Couldn't bootstrap from cluster", err);
}
tracer = zipkin_new();
{
int encoding_time_us = rand() % 1000;
lcbtrace_SPAN *encoding;
lcbtrace_REF ref;
ref.type = LCBTRACE_REF_CHILD_OF;
ref.span = span;
usleep(encoding_time_us);
}
lcb_CMDVIEW *vcmd;
char *doc_name = "beer";
char *view_name = "by_location";
char *options = "reduce=false&limit=3";
lcb_cmdview_create(&vcmd);
lcb_cmdview_callback(vcmd, view_callback);
lcb_cmdview_design_document(vcmd, doc_name, strlen(doc_name));
lcb_cmdview_view_name(vcmd, view_name, strlen(view_name));
lcb_cmdview_option_string(vcmd, options, strlen(options));
lcb_cmdview_include_docs(vcmd, 1);
lcb_cmdview_parent_span(vcmd, span);
lcb_cmdview_handle(vcmd, &handle);
err = lcb_view(instance, NULL, vcmd);
lcb_cmdview_destroy(vcmd);
if (err != LCB_SUCCESS) {
die(instance, "Couldn't schedule view operation", err);
}
fprintf(stderr, "Will wait for view operation to complete..\n");
zipkin_flush(tracer);
return 0;
}