Import Data with an SDK

  • how-to
    +
    How to import documents into Couchbase with an SDK.

    Introduction

    Importing data can be done from the Couchbase Server UI, via the cbimport command-line tool shipped with Couchbase Server, or using the SDK to script the process.

    Data load essentially consists of the following steps:

    1. Prepare data in some well known format such as Comma Separated Values (.csv) or JSON documents.

    2. Parse this data, and iterate over each document.

    3. Connect to your Couchbase instance.

    4. Connect to the appropriate bucket, scope, and collection.

    5. Decide on the key for this document (could be an ID, a sequence number, or some combination of fields).

    6. Do any additional processing if required.

    7. Insert the document.

    Couchbase Clients

    Clients access data by connecting to a Couchbase cluster over the network. The most common type of client is a Couchbase SDK, which is a full programmatic API that enables applications to take the best advantage of Couchbase. This developer guide focuses on the most commonly-used SDKs, but full explanations and reference documentation for all SDKs is available.

    The command line clients also provide a quick and streamlined interface for simple access and are suitable if you just want to access an item without writing any code. For this guide, we are especially interested in the cbimport tool.

    With some editions, the command line clients are provided as part of the installation of Couchbase Server. Assuming a default installation, you can find them in the following location, depending on your operating system:

    Linux

    /opt/couchbase/bin

    Windows

    C:\Program Files\Couchbase\Server\bin

    macOS

    /Applications/Couchbase Server.app/Contents/Resources/couchbase-core/bin

    The Couchbase Server UI also offers a graphical interface to cbimport.

    Read the following for further information about the clients available for importing data:

    Preparing the Data

    To prepare the data, extract or generate your data in an appropriate data format.

    The following are well supported for export as well as by cbimport and the module ecosystems of all Couchbase SDKs.

    • CSV

    • TSV

    • JSON

    • JSONL

    Comma Separated Values (.csv) are easily exported from many spreadsheet and database applications.

    Ensure that the first row is a header row containing the names of the columns within the document.

    id,type,name
    20001,airline,CSV-air-1
    20002,airline,CSV-air-2

    Tab Separated Values (.tsv) are a common variant of CSV files.

    id	type	name
    20011	airline	TSV-air-1
    20012	airline	TSV-air-2

    JSON (.json) files are especially well suited to import into Couchbase, as it is the default native datatype.

    A .json file contains only one single value, so to give flexibility to import one or many values, format this as an array of the values you want to store.

    [
    	{"id":20021, "type":"airline", "name":"JSON-air-1"},
    	{"id":20022, "type":"airline", "name":"JSON-air-2"},
    	{"id":20023, "type":"airline", "name":"JSON-air-3"}
    ]

    JSON Lines (.json) also known as NDJSON is a common format for streaming JSON, with one JSON object per line of text.

    {"id":20031, "type":"airline", "name":"JSONL-air-1"}
    {"id":20032, "type":"airline", "name":"JSONL-air-2"}
    {"id":20033, "type":"airline", "name":"JSONL-air-3"}

    Using cbimport

    Using cbimport is straightforward. Ensure you have the path to the command line clients in Couchbase Server in your path.

    You can import all of the data formats described above.

    • CSV

    • TSV

    • JSON

    • JSONL

    To import a CSV file using cbimport csv:

    1. Use the --dataset argument to specify the CSV file.

    2. Use the --cluster, --username, and --password arguments to specify your connection details.

    3. Use the --bucket and --scope-collection-exp arguments to specify the bucket, scope, and collection as required.

    4. Use the --generate-key argument to specify an ID for the imported documents.


    The following example imports a local CSV file, generating IDs such as airline_1234.

    cbimport csv \
      --dataset file://./import.csv \
      --cluster localhost --username Administrator --password password \
      --bucket travel-sample --scope-collection-exp inventory.airline \
      --generate-key %type%_%id%

    To import a TSV file using cbimport csv:

    1. Use the --dataset argument to specify the TSV file.

    2. Use the --field-separator argument to specify the field separation character, such as "\t" for tab.

    3. Use the --cluster, --username, and --password arguments to specify your connection details.

    4. Use the --bucket and --scope-collection-exp arguments to specify the bucket, scope, and collection as required.

    5. Use the --generate-key argument to specify an ID for the imported documents.


    The following example imports a local TSV file, generating IDs such as airline_1234.

    cbimport csv \
      --dataset file://./import.tsv --field-separator "\t" \
      --cluster localhost --username Administrator --password password \
      --bucket travel-sample --scope-collection-exp inventory.airline \
      --generate-key %type%_%id%

    To import a JSON file using cbimport json:

    1. Use the --dataset argument to specify the JSON file.

    2. Set the --format argument to list.

    3. Use the --cluster, --username, and --password arguments to specify your connection details.

    4. Use the --bucket and --scope-collection-exp arguments to specify the bucket, scope, and collection as required.

    5. Use the --generate-key argument to specify an ID for the imported documents.


    The following example imports a local JSON file, generating IDs such as airline_1234.

    cbimport json \
      --dataset file://./import.json --format list \
      --cluster localhost --username Administrator --password password \
      --bucket travel-sample --scope-collection-exp inventory.airline \
      --generate-key %type%_%id%

    To import a CSV file using cbimport json:

    1. Use the --dataset argument to specify the JSONL file.

    2. Set the --format argument to lines.

    3. Use the --cluster, --username, and --password arguments to specify your connection details.

    4. Use the --bucket and --scope-collection-exp arguments to specify the bucket, scope, and collection as required.

    5. Use the --generate-key argument to specify an ID for the imported documents.


    The following example imports a local JSONL file, generating IDs such as airline_1234.

    cbimport json \
      --dataset file://./import.jsonl --format lines \
      --cluster localhost --username Administrator --password password \
      --bucket travel-sample --scope-collection-exp inventory.airline \
      --generate-key %type%_%id%

    Importing Using an SDK

    While cbimport accomplishes all the necessary steps in a single command, as above, using an SDK gives you more flexibility and control. However all the same considerations apply, so let us look at those in turn.

    Parsing the Import into an Array or Stream of Records

    The details of how to parse the import data vary depending on the chosen input format, and the most appropriate library for your SDK.

    Parsing CSV and TSV Data

    • .NET

    • Java

    • Node.js

    • Python

    To parse CSV and TSV data, use the CsvHelper library.

    using CsvHelper;
    using CsvHelper.Configuration;
    using System.Globalization;
    public async Task importCSV(string filename)
    {
        using (var reader = new StreamReader(filename))
        using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
        {
            var records = csv.GetRecords<dynamic>();
            foreach (dynamic record in records) {
                await upsertDocument(record);
            }
        }
    }
    public async Task importTSV(string filename)
    {
        using (var reader = new StreamReader("import.tsv"))
        using (var tsv = new CsvReader(reader,
            new CsvConfiguration(CultureInfo.InvariantCulture)
                { Delimiter = "\t" }))
        {
            var records = tsv.GetRecords<dynamic>();
            foreach (dynamic record in records) {
                await upsertDocument(record);
            }
        }
    }

    Click the View button to see any code sample in context.

    To parse CSV and TSV data, use the opencsv library.

    import com.opencsv.*;
    public void importCSV() {
      try (CSVReaderHeaderAware csv = new CSVReaderHeaderAware(
          new FileReader("modules/howtos/examples/import.csv"))) {
              
        Map<String, String> row;
        while ((row = csv.readMap()) != null) {
          upsertRow(row);
        }
      }
      catch (java.io.FileNotFoundException e) {
        System.out.println("handle FileNotFoundException...");
      }
      catch (java.io.IOException e) {
        System.out.println("handle IOException...");
      }
      catch (com.opencsv.exceptions.CsvValidationException e) {
        System.out.println("handle CsvValidationException...");
      }
    }
    public void importTSV() {
      CSVParser parser =
        new CSVParserBuilder()
        .withSeparator('\t')
        .withIgnoreQuotations(true)
        .build();
      
      try (CSVReaderHeaderAware tsv =
          new CSVReaderHeaderAwareBuilder(
            new FileReader("modules/howtos/examples/import.tsv"))
          .withCSVParser(parser)
          .build()) {
    
        Map<String, String> row;
        while ((row = tsv.readMap()) != null) {
          upsertRow(row);
        }
      }
      // ...
    }

    If you are using the Reactor API then, as OpenCSV doesn’t have a built-in converter, use the Flux::generate method to convert the CSV or TSV file into a stream:

    public void importCSV_batch() {
      
      Flux<Map<String,String>> rows = Flux.generate(
        
        () -> new CSVReaderHeaderAware(
          new FileReader("modules/howtos/examples/import.csv")),
        
        (state, sink) -> {
          try {
            Map<String,String> row = state.readMap();
            if (row == null) { sink.complete(); }
            else { sink.next(row); }
            return state;
          }
          catch (Exception e) { throw new RuntimeException(e); }
        },
        state -> {
          try { state.close(); }
          catch (Exception e) { throw new RuntimeException(e); }
        }
      );
    
      Flux<MutationResult> results = 
        rows
        .map(row -> preprocess(row))
        .flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
        .doOnNext(System.out::println);
    
      results.blockLast(Duration.ofSeconds(60));
    }
    public void importTSV_batch() {
    
      Flux<Map<String,String>> rows = Flux.generate(
        
        () -> {
          CSVParser parser =
            new CSVParserBuilder()
            .withSeparator('\t')
            .withIgnoreQuotations(true)
            .build();
          return
            new CSVReaderHeaderAwareBuilder(
              new FileReader("modules/howtos/examples/import.tsv"))
            .withCSVParser(parser)
            .build();
        },
        
        // ...
    }

    Click the View button to see any code sample in context.

    To parse CSV and TSV data, use the csv-parse library.

    const { parse: csvParser } = require('csv-parse');
    const csvStream = (filename) =>
      fs.createReadStream(filename)
      .pipe(
          csvParser({columns: true}))
    const tsvStream = (filename) =>
      fs.createReadStream(filename)
        .pipe(
          csvParser({
            columns: true,
            delimiter: '\t'}))

    Click the View button to see any code sample in context.

    To parse CSV and TSV data, use the csv library.

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]
    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]
    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    Click the View button to see any code sample in context.

    Parsing JSON and JSONL Data

    • .NET

    • Java

    • Node.js

    • Python

    To parse JSON and JSONL data, use Newtonsoft.

    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    public async Task importJSON(string filename)
    {
        using (var reader = new StreamReader("import.json"))
        {
            var jsonReader = new JsonTextReader(reader);
            JArray arr = (JArray)JToken.ReadFrom(jsonReader);
            
            foreach (JObject record in arr)
            {
                await upsertDocument(record);
            }
        }
    }
    public async Task importJSONL(string filename)
    {
        using (var reader = new StreamReader("import.jsonl"))
        {
            var jsonlReader = new JsonTextReader(reader)
            {
                SupportMultipleContent = true
            };
            while (jsonlReader.Read())
            {
                var record = (JObject)JToken.ReadFrom(jsonlReader);
                await upsertDocument(record);
            }
        }
    }

    Click the View button to see any code sample in context.

    To parse JSON data, read the file as a string, then use the built-in Couchbase JSON handling to parse the result into an array of JSON objects.

    import com.couchbase.client.java.json.*;
    public void importJSON() {
      try {
        String content  = new String(
          Files.readAllBytes( // read whole document into memory
            Paths.get("modules/howtos/examples/import.json")),
          StandardCharsets.UTF_8);
        
        for (Object row: JsonArray.fromJson(content)) {
          JsonObject json = ((JsonObject) row);
          upsertRow(json.toMap());
        }
      }
      catch (java.io.FileNotFoundException e) {
        System.out.println("handle FileNotFoundException...");
      }
      catch (java.io.IOException e) {
        System.out.println("handle IOException...");
      }
    }

    If you are using the Reactor API then, once you’ve read the JSON array, use the Flux::fromIterable method to convert it into streams:

    public void importJSON_batch() {
    
      try {
        String content  = new String(
          Files.readAllBytes( // read whole document into memory
            Paths.get("modules/howtos/examples/import.json")),
          StandardCharsets.UTF_8);
        
        Flux<MutationResult> results = 
          Flux.fromIterable(JsonArray.fromJson(content))
            .map(row -> ((JsonObject) row).toMap())
            .map(map -> preprocess(map))
            .flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
            .doOnNext(System.out::println);
    
        results.blockLast(Duration.ofSeconds(60));
      }
      // ...
    }

    To parse JSONL data: do the same, but read the file line-by-line.

    public void importJSONL() {
      try (BufferedReader br =
            new BufferedReader(
              new FileReader("modules/howtos/examples/import.jsonl"))) {
                
          String line;
          while ((line = br.readLine()) != null) {
            Map<String,Object> row =
              JsonObject.fromJson(line).toMap();
              
            upsertRow(row);
        }
      }
      // ...
    }

    If you are using the Reactor API then open the JSONL file as a stream using the Flux::using method.

    public void importJSONL_batch() {
    
      Flux<String> lines = Flux.using(
        () -> Files.lines(Paths.get("modules/howtos/examples/import.jsonl")),
        Flux::fromStream,
        BaseStream::close);
    
      Flux<MutationResult> results =
        lines
            .map(line -> JsonObject.fromJson(line).toMap())
            .map(map -> preprocess(map))
            .flatMap(doc -> reactiveCollection.upsert(doc.getId(), doc.getContent()))
            .doOnNext(System.out::println);
    
      results.blockLast(Duration.ofSeconds(60));
    }

    Click the View button to see any code sample in context.

    Use the stream-json library.

    stream-json formats its output with a { key: …​, value: …​} wrapper, so we need to map the stream into the expected format.
    const stream = require('stream'); 
    
    // for JSON
    const StreamArray = require('stream-json/streamers/StreamArray')
    
    // for JsonL
    const {parser: jsonlParser} = require('stream-json/jsonl/Parser');
    const map = (f) =>
      new stream.Transform({
        objectMode: true,
        transform: (obj, _, next) => next(null, f(obj))
      })
      
    const jsonStream = (filename) =>
      fs.createReadStream(filename)
        .pipe(StreamArray.withParser())
        .pipe(map(obj => obj.value))
    const jsonlStream = (filename) =>
      fs.createReadStream(filename)
        .pipe(jsonlParser())
        .pipe(map(obj => obj.value))

    Click the View button to see any code sample in context.

    Use the json library:

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]
    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]
    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    Click the View button to see any code sample in context.

    Connecting to the Couchbase Server

    First, you need the connection details for the Couchbase server.

    Now decide which bucket and scope and collection you want to import to, and create them if they don’t already exist.

    • .NET

    • Java

    • Node.js

    • Python

    var cluster = await Cluster.ConnectAsync(
        "couchbase://your-ip",
        "Administrator", "password");
    var bucket =  await cluster.BucketAsync("travel-sample");
    var scope = await bucket.ScopeAsync("inventory");
    var _collection = await scope.CollectionAsync("airline");

    Click the View button to see any code sample in context.

    For more information, refer to Managing Connections.

    Cluster cluster = Cluster.connect(
      connectionString,
      ClusterOptions
        .clusterOptions(username, password));
    
    Bucket bucket = cluster.bucket(bucketName);
    Scope scope = bucket.scope("inventory");
    
    collection = scope.collection("airline");

    If you are using the Reactive API, then use the reactive collection instead:

    reactiveCollection = collection.reactive();

    Click the View button to see any code sample in context.

    For more information, refer to Managing Connections.

    const cluster = await couchbase.connect('couchbase://localhost', {
      username: 'Administrator',
      password: 'password',
    })
    
    const bucket = cluster.bucket('travel-sample')
    const collection = bucket.scope('inventory').collection('airline')

    Click the View button to see any code sample in context.

    For more information, refer to Managing Connections.

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    Click the View button to see any code sample in context.

    For more information, refer to Managing Connections.

    Inserting the Documents

    Having processed each imported document, you can insert it into the keyspace. Couchbase is a key-value store, and the document is the value, so before you can insert the document, you need to determine the key.

    To insert an imported document into the keyspace:

    1. Specify the key. This could be as simple as extracting the id field from the document, or using an incrementing sequence number.

    2. Do any additional processing, for example calculating fields, or adding metadata about the importer.

    3. Finally, use an upsert operation to the store the document.

    Use upsert rather than insert to upload the document even if the target key already has a value. This means that in the case of any error, it is easy to make any required tweaks to the import file and re-run the whole import.
    • .NET

    • Java

    • Node.js

    • Python

    To store the data, hook the prepared data into an upsert routine.

    As CsvHelper and Newtonsoft generate different outputs, we’ve provided some overloaded options that work for either.
    // CsvHelper emits `dynamic` records
    public async Task upsertDocument(dynamic record) {
        // define the key
        string key = record.type + "_" + record.id;
    
        // do any additional processing
        record.importer = ".NET SDK";
    
        // upsert the document
        await _collection.UpsertAsync(key, record);
    
        // any required logging
        Console.WriteLine(key);
    }
    
    // Newtonsoft.Json.Linq emits `JObjects`
    public async Task upsertDocument(JObject record) {
        // define the key
        string key = record["type"] + "_" + record["id"];
    
        // do any additional processing
        record["importer"] = ".NET SDK";
        
        // upsert the document
        await _collection.UpsertAsync(key, record);
        
        // any required logging
        Console.WriteLine(key);
    }

    Click the View button to see any code sample in context.

    For more information, refer to Data Operations.

    To store the data, hook the prepared data into an upsert routine. For the blocking API, use the method below.

    public void upsertRow(Map row) {
      
      JsonDocument doc = preprocess(row);
      
      String key = doc.getId();
      Object value = doc.getContent();
      
      // upsert the document
      collection.upsert(key, value);
      
      // any required logging
      System.out.println(key);
      System.out.println(value);
    }

    The Reactive API examples above already include a call to ReactiveCollection::upsert.

    In both cases, you must provide a preprocess routine which returns a key-value tuple object:

    class JsonDocument {
      private final String id;
      private final JsonObject content;
    
      public JsonDocument(String id, JsonObject content) {
        this.id = id;
        this.content = content;
      }
    
      public String getId() {
        return id;
      }
    
      public JsonObject getContent() {
        return content;
      }
    
      @Override
      public String toString() {
        return "JsonDocument{id='" + id + "', content=" + content + "}";
      }
    }
    public JsonDocument preprocess(Map row) {
      Map value = new HashMap(row);
    
      // define the KEY
      String key = value.get("type") + "_" + value.get("id");
    
      // do any additional processing
      value.put("importer", "Java SDK");
      
      return new JsonDocument(key, JsonObject.from(value));
    }

    Click the View button to see any code sample in context.

    For more information, refer to Data Operations.

    To iterate the prepared data stream, use a simple for loop in the same way as an array.

    const importStream = async (stream) => {
      for await (const doc of stream) {
        upsertDocument(doc)
      }
    }

    Hook the prepared stream in to an upsertDocument routine:

    const upsertDocument = async (doc) => {
      try {
        // Build the key
        const key = `${doc.type}_${doc.id}`
        
        // Do any processing, logging etc.
        doc.importer = "import.js"
        console.log(key, doc)
        
        // Upsert the document
        await collection.upsert(key, doc)
      }
      catch (error) {
        // Error handling, retry, logging etc.
        console.error(error)
      }
    }

    Click the View button to see any code sample in context.

    For more information, refer to Data Operations.

    To store the data, define functions to determine the key, and process the value.

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]
    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    Hook the prepared data into an upsertDocument routine which uses these functions.

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    For more information, refer to Data Operations.

    Note that the Python SDK offers a set of batch operations which are marked as volatile as of SDK 3.2.3, which may be more efficient. Here’s a brief example for CSV:

    Unresolved include directive in modules/guides/pages/import.adoc - include::python-sdk:howtos:example$import.py[]

    Click the View button to see any code sample in context.

    Reference and information:

    • The Couchbase Server UI offers a graphical view of documents, to check your imports interactively.

    How-to guides:

    Key-Value Operations with SDKs: