Skip to content

Commit

Permalink
Support for copy local stdin (#130)
Browse files Browse the repository at this point in the history
* New feature: Support for copy from local stdin
* Support for glob patterns in copy from local file
  • Loading branch information
DMickens authored Jan 30, 2024
1 parent 78c43a9 commit e9e0b72
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 49 deletions.
7 changes: 4 additions & 3 deletions packages/v-protocol/src/backend-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,15 @@ export class NoticeMessage implements BackendMessage, NoticeOrError {

export class VerifyFilesMessage {
public readonly name: MessageName = 'verifyFiles'
public readonly fileNames: string[]
public readonly fileNames: string[] | null
constructor(public readonly length: number,
public numFiles: number,
public files: string[],
public files: string[] | null,
public readonly rejectFile: string,
public readonly exceptionFile: string)
{
this.fileNames = [...files] // shallow copy
// shallow copy the fileNames, or null for copy from local stdin
this.fileNames = files !== null ? [...files] : null
}
}

Expand Down
10 changes: 7 additions & 3 deletions packages/v-protocol/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ export class Parser {
private parseVerifyFilesMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const numFiles = this.reader.int16() //int16 number of files, n
const fileNames: string[] = new Array(numFiles)
for (let i = 0; i < numFiles; i++) {
fileNames[i] = this.reader.cstring() //string[n], name of each file
let fileNames: string[] | null = null;
if (numFiles !== 0) {
fileNames = new Array(numFiles);
for (let i = 0; i < numFiles; i++) {
fileNames[i] = this.reader.cstring(); // string[n], name of each file
}
}

const rejectFile = this.reader.cstring() //string reject file name
const exceptionFile = this.reader.cstring() //string exceptions file name
return new VerifyFilesMessage(length, numFiles, fileNames, rejectFile, exceptionFile)
Expand Down
41 changes: 41 additions & 0 deletions packages/vertica-nodejs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,47 @@ Prepared statements are slightly different. Here we will provide a query in the
})
```

### Copy Local Commands

Copy Local commands allow you to quickly load data from a client system to your vertica database. There are two type of copy local commands, copy from local file and copy from local stdin. If REJECTED DATA or EXCEPTIONS are specified in the copy command, the provided file paths must be writable by the process running the driver. If the files exist, the driver will append to the end of them. If the files don't exist, the driver will create them first. If RETURNREJECTED is specified in place of REJECTED DATA, the rejected rows can be retrieved from the result object with result.getRejectedRows().

#### Copy From Local File

Copy from local file opens and reads the file(s) from the client system and sends the data in chunks of 64Kb to the server for insertion. The files must be readable by the process running the driver.

```javascript
const {Client} = require('vertica-nodejs')
const client = new Client()

client.connect()
client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => {
if (err) console.log(err)
client.query("COPY myTable FROM LOCAL 'ints.dat' REJECTED DATA 'rejects.txt' EXCEPTIONS 'exceptions.txt'", (err, res) => {
console.log(err || res)
client.end()
})
})
```

#### Copy From Local Stdin (stream)

Copy from local stdin in vertica-nodejs can be better described as copy from local stream. The driver supports inserting any stream of data that is an instance of `stream.Readable``. Binary and utf-8 encoded streams are supported. Since the query syntax does not specify where to access the stream in the same way that copy from local file specifies the location of the file, an additional parameter must be provided in a config object, copyStream.

```javascript
const {Client} = require('vertica-nodejs')
const client = new Client()

client.connect()
const readableStream = fs.createReadStream(filePath) // assumes filePath is a string containing the path to a data file
client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => {
if (err) console.log(err)
client.query({text: "COPY myTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
console.log(err || res.getRejectedRows())
client.end()
})
})
```

### Modifying Result Rows with RowMode

The Result.rows returned by a query are by default an array of objects with key-value pairs that map the column name to column value for each row. Often you will find you don't need that, especially for very large result sets. In this case you can provide a query object parameter containing the rowMode field set to 'array'. This will cause the driver to parse row data into arrays of values without creating an object and having key-value pairs.
Expand Down
20 changes: 10 additions & 10 deletions packages/vertica-nodejs/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,6 @@ class Client extends EventEmitter {
}

_handleLoadFile(msg) {
// initiate copy data message transfer.
// What determines the size sent to the server in each message?
this.activeQuery.handleLoadFile(msg, this.connection)
}

Expand All @@ -499,7 +497,7 @@ class Client extends EventEmitter {
}

_handleEndOfBatchResponse() {
//noop
this.activeQuery.handleEndOfBatchResponse(this.connection)
}

_handleNotice(msg) {
Expand Down Expand Up @@ -619,23 +617,25 @@ class Client extends EventEmitter {
}
}

// todo - refactor to improve readibility. Move out logic for identifying parameter types to helper function if possible
query(config, values, callback) {
// can take in strings, config object or query object
var query
var result
var readTimeout
var readTimeoutTimer
var queryCallback
let query
let result
let readTimeout
let readTimeoutTimer
let queryCallback

if (config === null || config === undefined) {
throw new TypeError('Client was passed a null or undefined query')
} else if (typeof config.submit === 'function') {
}
if (typeof config.submit === 'function') {
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
result = query = config
if (typeof values === 'function') {
query.callback = query.callback || values
}
} else {
} else { // config is a string
readTimeout = this.connectionParameters.query_timeout
query = new Query(config, values, callback)
if (!query.callback) {
Expand Down
18 changes: 17 additions & 1 deletion packages/vertica-nodejs/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,23 @@ class Connection extends EventEmitter {
this._send(serialize.EndOfBatchRequest())
}

sendCopyDataStream(msg) {
sendCopyDataStream(copyStream) {
copyStream.on('readable', () => {
let bytesRead
while ((bytesRead = copyStream.read(bufferSize)) !== null) {
if (Buffer.isBuffer(bytesRead)) { // readableStream is binary
this.sendCopyData(bytesRead)
} else { // readableStream is utf-8 encoded
this.sendCopyData(Buffer.from(bytesRead, 'utf-8'))
}
}
})
copyStream.on('end', () => {
this.sendEndOfBatchRequest()
})
}

sendCopyDataFiles(msg) {
const buffer = Buffer.alloc(bufferSize);
const fd = fs.openSync(msg.fileName, 'r');
let bytesRead = 0;
Expand Down
47 changes: 38 additions & 9 deletions packages/vertica-nodejs/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ const Result = require('./result')
const utils = require('./utils')
const fs = require('fs')
const fsPromises = require('fs').promises
const stream = require('stream')
const glob = require('glob')

class Query extends EventEmitter {
constructor(config, values, callback) {
super()

config = utils.normalizeQueryConfig(config, values, callback)

this.text = config.text
this.values = config.values
this.rows = config.rows
this.types = config.types
this.name = config.name
this.binary = config.binary || false
this.copyStream = config.copyStream || null
// use unique portal name each time
this.portal = config.portal || ''
this.callback = config.callback
Expand Down Expand Up @@ -195,6 +197,13 @@ class Query extends EventEmitter {
//do nothing, vertica doesn't support result-row count limit
}

handleEndOfBatchResponse(connection) {
if (this.copyStream) { //copy from stdin
connection.sendCopyDone()
}
// else noop, backend will send CopyDoneResponse for copy from local file to continue the process
}

prepare(connection) {
// prepared statements need sync to be called after each command
// complete or when an error is encountered
Expand Down Expand Up @@ -249,16 +258,36 @@ class Query extends EventEmitter {
}

handleCopyInResponse(connection) {
connection.sendCopyFail('No source stream defined')
connection.sendCopyDataStream(this.copyStream)
}

async handleVerifyFiles(msg, connection) {
try { // Check if the data file can be read
await fsPromises.access(msg.files[0], fs.constants.R_OK);
} catch (readInputFileErr) { // Can't open input file for reading, send CopyError
console.log(readInputFileErr.code)
connection.sendCopyError(msg.files[0], 0, '', "Unable to open input file for reading")
return;
if (msg.numFiles !== 0) { // we are copying from file, not stdin
let expandedFileNames = []
for (const fileName of msg.files) {
if (/[*?[\]]/.test(fileName)) { // contains glob pattern
const matchingFiles = glob.sync(fileName)
expandedFileNames = expandedFileNames.concat(matchingFiles)
} else {
expandedFileNames.push(fileName)
}
}
const uniqueFileNames = [...new Set(expandedFileNames)] // remove duplicates
msg.numFiles = uniqueFileNames.length
msg.fileNames = uniqueFileNames
for (const fileName of uniqueFileNames) {
try { // Check if the data file can be read
await fsPromises.access(fileName, fs.constants.R_OK);
} catch (readInputFileErr) { // Can't open input file for reading, send CopyError
connection.sendCopyError(fileName, 0, '', "Unable to open input file for reading")
return;
}
}
} else { // check to make sure the readableStream is in fact a readableStream
if (!(this.copyStream instanceof stream.Readable)) {
connection.sendCopyError(this.copyStream, 0, '', "Cannot perform copy operation. Stream must be an instance of stream.Readable")
return
}
}
if (msg.rejectFile) {
try { // Check if the rejections file can be written to, if specified
Expand Down Expand Up @@ -300,7 +329,7 @@ class Query extends EventEmitter {
}

handleLoadFile(msg, connection) {
connection.sendCopyDataStream(msg)
connection.sendCopyDataFiles(msg)
}

handleWriteFile(msg, connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@ const assert = require('assert')
const path = require('path')
const fs = require('fs')

describe('Running Copy Commands', function () {
describe('Running Copy From Local File Commands', function () {
// global pool to use for queries
const pool = new vertica.Pool()

// global file names and paths
const goodFileName = "copy-good.dat"
const badFileName = "copy-bad.dat"
const goodFilePath = path.join(process.cwd(), goodFileName);
const badFilePath = path.join(process.cwd(), badFileName)
const goodFileContents = "1|'a'\n2|'b'\n3|'c'\n4|'d'\n5|'e'" // 5 correctly formatted rows
const badFileContents = "1|'a'\n'b'|2\n3|'c'\n'd'|4\n5|'e'" // rows 2 and 4 malformed
const copyGoodName = "copy-good.dat"
const copyBadName = "copy-bad.dat"
const copyGoodPath = path.join(process.cwd(), copyGoodName);
const copyBadPath = path.join(process.cwd(), copyBadName)
const goodFileContents = "1|a\n2|b\n3|c\n4|d\n5|e\n" // 5 correctly formatted rows
const badFileContents = "6|f\ng|7\n8|h\ni|9\n10|j\n" // rows 2 and 4 malformed

// generate temporary test files, create table before tests begin
before((done) => {
fs.writeFile(goodFilePath, goodFileContents, () => {
fs.writeFile(badFilePath, badFileContents, () => {
fs.writeFile(copyGoodPath, goodFileContents, () => {
fs.writeFile(copyBadPath, badFileContents, () => {
pool.query("CREATE TABLE copyTable (num int, let char)", (done))
})
})
})

// delete temporary test files, drop table after tests are complete
after((done) => {
fs.unlink(goodFilePath, () => {
fs.unlink(badFilePath, () => {
fs.unlink(copyGoodPath, () => {
fs.unlink(copyBadPath, () => {
pool.query("DROP TABLE IF EXISTS copyTable", () => {
pool.end(done)
})
Expand Down Expand Up @@ -66,7 +66,7 @@ describe('Running Copy Commands', function () {
assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents
fs.readFile('rejects.txt', 'utf8', (err, data) => {
assert.equal(err, undefined)
assert.equal(data, "'b'|2\n'd'|4\n") // rows 2 and 4 are malformed
assert.equal(data, "g|7\ni|9\n") // rows 2 and 4 are malformed
})
} finally {
fs.unlink('rejects.txt', done)
Expand Down Expand Up @@ -185,20 +185,25 @@ describe('Running Copy Commands', function () {
});

})
it ('behaves properly with ABORT ON ERROR', function(done) {
done()
})

it('succeeds using glob patterns', function(done) {
done()
})

it('succeeds with multiple input files', function(done) {
done()
pool.query("COPY copyTable FROM LOCAL 'copy-good.dat', 'copy-bad.dat' RETURNREJECTED", (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 8) // 5 good rows in goodFileContents
assert.deepEqual(res.getRejectedRows(), [7, 9])
done()
})
})

it('succeeds with basic copy from stdin command', function(done) {
//todo
done()
it('succeeds using glob patterns', function(done) {
pool.query("COPY copyTable FROM LOCAL 'copy-*.dat' RETURNREJECTED", (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 8) // 5 good rows in goodFileContents
assert.equal(res.getRejectedRows().length, 2) // check the length instead of position in case the order of files loaded changes
pool.query({text: "SELECT num FROM copyTable ORDER BY num ASC", rowMode: 'array'}, (err, res) => {
assert.deepEqual(res.rows, [[1],[2],[3],[4],[5],[6],[8],[10]]) // 7 and 9 malformed.
done()
})
})
})
})
Loading

0 comments on commit e9e0b72

Please sign in to comment.