We need one more big feature from the esclu command-line tool, and that’s to allow us to bulk-upload documents. Recall that in Processing Data Files Sequentially, we developed an LDJ data file containing interleaved commands and documents for Elasticsearch’s bulk API.
Here’s a truncated sample to refresh your memory:
| | {"index":{"_id":"pg11"}} |
| | {"id":11,"title":"Alice's Adventures in Wonderland","authors":...} |
| | {"index":{"_id":"pg132"}} |
| | {"id":132,"title":"The Art of War","authors":...} |
Open your index.js file and insert this new command, again before the program.parse line.
| | program |
| | .command('bulk <file>') |
| | .description('read and perform bulk options from the specified file') |
| | .action(file => { |
| | fs.stat(file, (err, stats) => { |
| | if (err) { |
| | if (program.json) { |
| | console.log(JSON.stringify(err)); |
| | return; |
| | } |
| | throw err; |
| | } |
| | |
| | const options = { |
| | url: fullUrl('_bulk'), |
| | json: true, |
| | headers: { |
| | 'content-length': stats.size, |
| | 'content-type': 'application/json', |
| | } |
| | }; |
| | const req = request.post(options); |
| | |
| | const stream = fs.createReadStream(file); |
| | stream.pipe(req); |
| | req.pipe(process.stdout); |
| | }); |
| | }); |
This command takes a bit more code than the previous ones, but it’s mostly stuff you’ve seen before.
Unlike the get and url commands that took an optional parameter, the bulk command’s <file> parameter is required. You can try running esclu bulk without a file parameter to see how the Commander module handles this situation.
Inside the action callback, the first thing we do is use fs.stat to asynchronously check on the provided file. This asserts that the file exists and can be reached by the user running the process. If for any reason the stat call was unsuccessful, we produce the correct response—either outputting a JSON object or throwing an exception, depending on whether the user specified the --json flag.
Next, we construct the options for the request. Elasticsearch’s _bulk API endpoint expects to receive JSON and we expect to receive JSON back, so we set the json option to true as well as provide a content-type header of application/json.
Using the size information from the stat call, we can specify the HTTP header content-length. This is important because we’ll be streaming the file content to the server rather than handing all the content to the Request module at once.
Using request.post, we initialize an HTTP POST request to Elasticsearch, capturing the returned object in a variable called req. This object can be used as a writable stream (stream.Writable) for sending content, and also as a readable stream (stream.Readable) for receiving the server’s response.
This means we can pipe content into and out of it, according to Node.js’s stream APIs.[60] Here, we’re piping the bulk file content from the filesystem into it, and piping the output directly to standard output. The upshot of this approach is that neither the bulk file nor the response from Elasticsearch needs to be wholly resident in the Node.js process’s memory.
Lastly, we open a read stream to the file using fs.createReadStream and pipe that into the request object. As for the server’s response, we pipe the request object’s output directly to process.stdout.
Let’s try out the _bulk command, first by failing to supply a file path to see how Commander responds.
| | $ ./esclu bulk |
| | |
| | error: missing required argument `file' |
No surprise here: esclu knows that the file parameter is required.
Now let’s try performing a bulk file insertion. Since the bulk file we created does not list an index or type for each document to insert, we should provide defaults using the --index and --type flags, respectively. Also, since the output will be large, we’ll capture it in a file then explore it with jq.
Here’s the command you should run:
| | $ ./esclu bulk ../data/bulk_pg.ldj -i books -t book > ../data/bulk_result.json |
This command assumes that you’ve been following along, with a data directory that’s a sibling of the esclu project directory, and that you’ve created or downloaded the bulk_pg.ldj as described in Chapter 5, Transforming Data and Testing Continuously. If your file is somewhere else, or you’d like to store the result JSON somewhere else, adjust your paths accordingly.
That command may take a while to finish, depending on a variety of factors. For me, on my Ubuntu laptop with an SSD, it takes only a few seconds, but it’s hard to tell if that’s typical. When the command is finished, take a peek at the JSON using jq.
| | $ cat ../data/bulk_result.json | jq '.' | head -n 20 |
| | { |
| | "took": 3410, |
| | "errors": false, |
| | "items": [ |
| | { |
| | "index": { |
| | "_index": "books", |
| | "_type": "book", |
| | "_id": "pg1", |
| | "_version": 1, |
| | "result": "created", |
| | "_shards": { |
| | "total": 2, |
| | "successful": 1, |
| | "failed": 0 |
| | }, |
| | "created": true, |
| | "status": 201 |
| | } |
| | }, |
Three keys are immediately visible in the response JSON object:
Each object in the items array describes one of the bulk commands. Here we can see just the first such command, whose index key tells the story of the operation.
Note that the status key of the index object has the value 201. You may already be familiar with the HTTP status code 200 OK. Like 200 OK, the HTTP status code 201 Created is also an affirmative code, but it means that an object on the server was created as a result of the request.
Using jq’s length function, we can count how many total operations there were.
| | $ cat ../data/bulk_result.json | jq '.items | length' |
| | 53212 |
Of course, the number of items you see may be different. Project Gutenberg is adding new books to its catalog all the time.
Now, using our list-indices command, let’s check how many documents the books index has:
| | $ ./esclu li |
| | health status index uuid pri rep docs.count store.size pri.store.size |
| | yellow open books n9...sQ 5 1 53212 24.5mb 24.5mb |
Wonderful! As you can see under the docs.count column, all 53,212 documents have been successfully added to the books index.