Function: Advanced Keep the Last N User Items

  • Capella Operational
      +

      Keep the last N user notifications related to a user ID.

      The advancedKeepLastN function:

      • Demonstrates how to keep a user record with the last N activities

      • Requires Eventing Storage (or a metadata collection) and a source collection

      • Requires a binding of type bucket alias

      • Operates on any mutation where the KEY starts with nu: in the form nu:#:#

        • The KEY nu:#:# has 2 numbers. The first is an increasing notification number and the second is the user ID.

      • Only keeps N records for each user

      • Removes the earliest notification record for a user whenever a new record is inserted for that user

      The following example assumes that N always increases across time and ignores any duplicates. It keeps only the 3 most recent notifications for each user ID.

      • advancedKeepLastN

      • Input data

      • Output data

      There are two variants of this function available: a Couchbase Server version 6.6 that implements userspace CAS, and a Couchbase Server version 6.6.1+/7.0.0+ that uses true CAS.

      The following example uses true CAS.

      // Configure the settings for the advancedKeepLastN function as follows:
      //
      // Version 7.1+
      //   "Function Scope"
      //     *.* (or try bulk.data if non-privileged)
      // Version 7.0+
      //   "Listen to Location"
      //     bulk.data.source
      //   "Eventing Storage"
      //     rr100.eventing.metadata
      //   Binding(s)
      //    1. "binding type", "alias name...", "bucket.scope.collection", "Access"
      //       "bucket alias", "src_col",       "bulk.data.source",        "read and write"
      //
      // Version 6.6.1
      //   "Source Bucket"
      //     source
      //   "MetaData Bucket"
      //     metadata
      //   Binding(s)
      //    1. "binding type", "alias name...", "bucket", "Access"
      //       "bucket alias", "src_col",       "source", "read and write"
      
      /*
       * Process all mutations; updateNotifyArrayInKV(...) only processes data with KEYS like nu:#:#
       */
      function OnUpdate(doc, meta) {
          const MAX_ARRAY = 3; // Keep 'N' items
          const DEBUG = false; // If true, the debug log can be too long
      
          updateNotifyArrayInKV(doc, meta, MAX_ARRAY, DEBUG);
      }
      
      /*
       * Manipulate the in-memory document to only keep 'MAX_ARRAY' items
       */
      function addToNtfyArray(user_doc, user_id, insert_json, MAX_ARRAY, DEBUG) {
          var ntfy_id = insert_json.nid;
          if (user_doc.notifications && user_doc.notifications[0] &&
              user_doc.notifications[0].nid >= ntfy_id &&
              user_doc.notifications.length === MAX_ARRAY) {
              // Do nothing; this is older data
              return null;
          } else {
              // Find the insert position
              for (var i = 0; i <= user_doc.notifications.length + 1; i++) {
                  if (i < user_doc.notifications.length && user_doc.notifications[i].nid === ntfy_id) {
                      // Do nothing; this is duplicated data
                      if (DEBUG) log('Ignore DUP ntfy_id', ntfy_id, 'user_id', user_id, 'insert_json', insert_json);
                      return null;
                  }
                  if (i == user_doc.notifications.length || user_doc.notifications[i].nid > ntfy_id) {
                      // Add to middle or end of array
                      user_doc.notifications.splice(i, 0, insert_json);
                      break;
                  }
              }
          }
          while (user_doc.notifications.length > MAX_ARRAY) {
              // Ensure proper size
              user_doc.notifications.shift();
          }
          return user_doc;
      }
      
      /*
       * Creates, gets, and updates (via replace) the KV tracking array document
       */
      function updateNotifyArrayInKV(doc, meta, MAX_ARRAY, DEBUG) {
          // Process ALL data like nu:#:#
          var parts = meta.id.split(':');
          if (!parts || parts.length != 3 || parts[0] != "nu") return;
          var ntfy_id = parseInt(parts[1]);
          var user_id = parseInt(parts[2]);
          //log("Doc created/updated " +  meta.id + " ntfy_id " + ntfy_id  + " user_id " + user_id);
      
          var insert_json = {
              "nid": ntfy_id,
              doc
          };
          // In version 6.6.1, use CAS in Eventing to avoid race conditions
          var res = null;
          var req_id = "user_plus_ntfys:" + user_id;
          var req_meta = {
              id: req_id
          };
          var user_doc = null;
          var user_meta = null;
          while (res === null) {
              res = couchbase.get(src_col, req_meta);
              if (DEBUG) log('couchbase.get(src_col,', req_meta, ') success==' + res.success, res);
              if (res.success) {
                  user_doc = res.doc;
                  // EXAMPLE user_meta
                  // {"id":"user_plus_ntfys:2","cas":"1609965779014844416","data_type":"json"}
                  user_meta = res.meta;
              } else {
                  if (!res.error.key_not_found) {
                      // Do nothing; this is a big error
                      log("FAILED to insert id: " + meta.id, doc, 'res', res)
                      return;
                  }
                  // Create the document and initialize it
                  user_doc = {
                      "type": "user_plus_ntfys",
                      "id": user_id,
                      "notifications": []
                  };
                  res = couchbase.insert(src_col, req_meta, user_doc);
                  if (DEBUG) log('couchbase.insert(src_col,', req_meta, user_doc, ') success==' + res.success, res);
                  // Redo the loop to force couchbase.get
                  res = null;
              }
              if (res !== null) {
                  // Successful couchbase.get(...) for both user_doc and user_meta
                  // Manipulate the copy of the user_doc to keep only MAX_ARRAY
                  var new_doc = addToNtfyArray(user_doc, user_id, insert_json, MAX_ARRAY, DEBUG);
                  if (new_doc == null) {
                      // Ignore or skip duplicated data
                      break;
                  }
                  // Try to replace the user_doc with new_doc; pass CAS to test for race conditions
                  res = couchbase.replace(src_col, user_meta, new_doc);
                  if (DEBUG) log('couchbase.replace(src_col,', user_meta, new_doc, ') success==' + res.success, res);
                  if (res.success) {
                      // CAS matches and operation is successful
                      break;
                  } else {
                      // Redo loop and try again
                      res = null;
                  }
              }
          }
      }

      Create a new test document set using the Query Editor to insert the data items. You do not need an Index.

      key data

      nu:1:1

      {"somekey":"someValue"}

      nu:2:2

      {"somekey":"someValue"}

      nu:3:1

      {"somekey":"someValue"}

      nu:4:1

      {"somekey":"someValue"}

      nu:5:1

      {"somekey":"someValue"}

      nu:6:2

      {"somekey":"someValue"}

      nu:7:2

      {"somekey":"someValue"}

      nu:8:1

      {"somekey":"someValue"}

      nu:9:2

      {"somekey":"someValue"}

      nu:10:2

      {"somekey":"someValue"}

        UPSERT INTO `bulk`.`data`.`source` (KEY,VALUE)
        VALUES ( "nu:1:1",  {"somekey":"someValue"} ),
        VALUES ( "nu:2:2",  {"somekey":"someValue"} ),
        VALUES ( "nu:3:1",  {"somekey":"someValue"} ),
        VALUES ( "nu:4:1",  {"somekey":"someValue"} ),
        VALUES ( "nu:5:1",  {"somekey":"someValue"} ),
        VALUES ( "nu:6:2",  {"somekey":"someValue"} ),
        VALUES ( "nu:7:2",  {"somekey":"someValue"} ),
        VALUES ( "nu:8:1",  {"somekey":"someValue"} ),
        VALUES ( "nu:9:2",  {"somekey":"someValue"} ),
        VALUES ( "nu:10:2", {"somekey":"someValue"} );
      NEW/OUTPUT: KEY user_plus_ntfys:1
      
      {
        "type": "user_plus_ntfys",
        "id": 1,
        "notifications": [{
          "nid": 4,
          "doc": {
            "somekey": "someValue"
          }
        }, {
          "nid": 5,
          "doc": {
            "somekey": "someValue"
          }
        }, {
          "nid": 8,
          "doc": {
            "somekey": "someValue"
          }
        }],
        "random": 0.9071605464143964
      }
      
      NEW/OUTPUT: KEY user_plus_ntfys:2
      
      {
        "type": "user_plus_ntfys",
        "id": 2,
        "notifications": [{
          "nid": 7,
          "doc": {
            "somekey": "someValue"
          }
        }, {
          "nid": 9,
          "doc": {
            "somekey": "someValue"
          }
        }, {
          "nid": 10,
          "doc": {
            "somekey": "someValue"
          }
        }]
      }