Skip to content

Commit

Permalink
CrateDB bulk operations: Add support for improved DML efficiency
Browse files Browse the repository at this point in the history
https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations

- In order to use the bulk operations interface, a `PDOStatement` needs
  to be prepared using the `bulkMode` option, like
  `->prepare($sql, ["bulkMode" => true])`.
- The interface of `BulkResponse` has been made compatible with
  `Collection`, specifically wrt. the `getRows()` method, in order to
  return data from the driver without needing other proprietary methods.
- In order to propagate the non-standard bulk response shape back, the
  user has to select the `PDO::FETCH_NUM` fetch style.
- Documentation: Add two example programs about insert operations
  • Loading branch information
amotl committed May 4, 2023
1 parent 28cc2a9 commit 8c706da
Show file tree
Hide file tree
Showing 15 changed files with 679 additions and 48 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Changelog for crate-pdo
Unreleased
==========

- Added support for `CrateDB bulk operations`_, for improved efficiency on
DML operations.

.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations

2022/11/29 2.1.4
================

Expand Down
37 changes: 21 additions & 16 deletions DEVELOP.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ If the environment is outdated, you can upgrade it::

composer update

If you see messages like ``Warning: No code coverage driver available`` when running
the tests, you will need to install the ``xdebug`` extension::

pecl install xdebug

It may happen that you will have to re-install it, for example after your PHP
version has been upgraded by your package manager.


Running the Tests
=================
Expand All @@ -60,16 +68,26 @@ You can run the tests like::
# Run test suite
composer run test

# Run code style checks
composer run style

# Output coverage report as HTML
composer run -- test --coverage-html ./report
open report/index.html

# Run specific tests
composer run -- test --filter "testFetchColumn"


Invoke code style checks
========================

::

# Run code style checks
composer run check-style

# Some code style quirks can be automatically fixed
composer run fix-style



************
Using Docker
Expand Down Expand Up @@ -121,19 +139,6 @@ Running the Tests
open build/multicover/html/index.html


Invoke code style checks
========================

::

# Run code style checks
composer run check-style

# Some code style quirks can be automatically fixed
composer run fix-style



****************************
Working on the documentation
****************************
Expand Down
34 changes: 34 additions & 0 deletions docs/connect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,40 @@ method used.
| | ``PDO::FETCH_OBJ`` |
+----------------------------+-----------------------+

Bulk operations
===============

With CrateDB :ref:`crate-reference:http-bulk-ops`, suitable for ``INSERT``,
``UPDATE``, and ``DELETE`` statements, you can submit multiple records, aka.
batches, to CrateDB within a single operation. By using this way of communication,
both the client and the server will not waste resources on building and decoding
huge SQL statements, and data will also propagate more efficiently between CrateDB
cluster nodes.

To use this mode, the ``PDOStatement`` offers a corresponding ``bulkMode`` option.
When creating a statement instance with it, the ``$parameters`` data will be
obtained as a **list of records**, like demonstrated in the example below.

Please note that you **must** use ``PDO::FETCH_NUM`` on the fetch operation,
because the response object type ``BulkResponse`` is different than the regular
response type ``Collection``.

.. code-block:: php
// Run insert operation.
$parameters = [[5, 'foo', 1], [6, 'bar', 2], [7, 'foo', 3], [8, 'bar', 4]];
$statement = $connection->prepare(
'INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)',
array("bulkMode" => true));
$statement->execute($parameters);
// Evaluate response.
// MUST use `PDO::FETCH_NUM` for returning bulk operation responses.
print("Total count: {$statement->rowCount()}\n");
$response = $statement->fetchAll(PDO::FETCH_NUM);
print_r($response);
Next steps
==========

Expand Down
39 changes: 39 additions & 0 deletions examples/insert_basic.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php
/*
* Basic example demonstrating how to connect to CrateDB using PHP PDO.
*
* Prerequisites:
*
* docker run --rm -it --publish=4200:4200 crate
*
* Synopsis:
*
* php examples/insert_basic.php
*/
include("./vendor/autoload.php");

error_reporting(E_ALL ^ E_DEPRECATED);

// Connect to CrateDB.
use Crate\PDO\PDO;
$connection = new PDO("crate:localhost:4200", "crate");

// Create database table.
$connection->exec("DROP TABLE IF EXISTS test_table;");
$connection->exec("CREATE TABLE test_table (id INTEGER, name STRING, int_type INTEGER);");

// Run insert operation.
$statement = $connection->prepare('INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)');
$statement->execute([5, 'foo', 1]);
$statement->execute([6, 'bar', 2]);

// Evaluate response.
print("Total count: {$statement->rowCount()}\n");
$response = $statement->fetchAll(PDO::FETCH_NUM);
print_r($response);

// Disconnect from database.
// https://www.php.net/manual/en/pdo.connections.php
// https://stackoverflow.com/questions/18277233/pdo-closing-connection
$statement = null;
$connection = null;
43 changes: 43 additions & 0 deletions examples/insert_bulk.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php
/*
* Example demonstrating how to use CrateDB's bulk operations interface for
* inserting large amounts of data efficiently, using PHP PDO.
*
* Prerequisites:
*
* docker run --rm -it --publish=4200:4200 crate
*
* Synopsis:
*
* php examples/insert_bulk.php
*/
include("./vendor/autoload.php");

error_reporting(E_ALL ^ E_DEPRECATED);

// Connect to CrateDB.
use Crate\PDO\PDO;
$connection = new PDO("crate:localhost:4200", "crate");

// Create database table.
$connection->exec("DROP TABLE IF EXISTS test_table;");
$connection->exec("CREATE TABLE test_table (id INTEGER, name STRING, int_type INTEGER);");

// Run insert operation.
$parameters = [[5, 'foo', 1], [6, 'bar', 2], [7, 'foo', 3], [8, 'bar', 4]];
$statement = $connection->prepare(
'INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)',
array("bulkMode" => true));
$statement->execute($parameters);

// Evaluate response.
// MUST use `PDO::FETCH_NUM` for returning bulk operation responses.
print("Total count: {$statement->rowCount()}\n");
$response = $statement->fetchAll(PDO::FETCH_NUM);
print_r($response);

// Disconnect from database.
// https://www.php.net/manual/en/pdo.connections.php
// https://stackoverflow.com/questions/18277233/pdo-closing-connection
$statement = null;
$connection = null;
16 changes: 16 additions & 0 deletions src/Crate/PDO/Http/ServerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
namespace Crate\PDO\Http;

use Crate\PDO\PDOInterface;
use Crate\Stdlib\BulkResponseInterface;
use Crate\Stdlib\CollectionInterface;

interface ServerInterface
Expand All @@ -51,6 +52,21 @@ public function configure(PDOInterface $PDO): void;
*/
public function execute(string $queryString, array $parameters): CollectionInterface;

/**
* Execute the PDOStatement in "bulk operations" and return the response from server
* wrapped inside a BulkResponseInterface.
*
* Bulk operations are only supported for `INSERT`, `UPDATE`, and `DELETE` statements.
*
* https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
*
* @param string $queryString
* @param array $parameters
*
* @return BulkResponseInterface
*/
public function executeBulk(string $queryString, array $parameters): BulkResponseInterface;

/**
* @return array
*/
Expand Down
75 changes: 56 additions & 19 deletions src/Crate/PDO/Http/ServerPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
use Crate\PDO\Exception\RuntimeException;
use Crate\PDO\PDO;
use Crate\PDO\PDOInterface;
use Crate\Stdlib\BulkResponse;
use Crate\Stdlib\BulkResponseInterface;
use Crate\Stdlib\Collection;
use Crate\Stdlib\CollectionInterface;
use GuzzleHttp\Client;
Expand Down Expand Up @@ -67,7 +69,7 @@ final class ServerPool implements ServerInterface
/**
* Client constructor.
*
* @param array $servers
* @param array $servers
* @param ClientInterface|null $client
*/
public function __construct(array $servers, ClientInterface $client = null)
Expand All @@ -91,6 +93,24 @@ public function __construct(array $servers, ClientInterface $client = null)
* @throws \GuzzleHttp\Exception\ConnectException
*/
public function execute(string $query, array $parameters): CollectionInterface
{
return $this->executeGeneric($query, $parameters, false);
}

/**
* {@Inheritdoc}
* @throws \GuzzleHttp\Exception\ConnectException
*/
public function executeBulk(string $query, array $parameters): BulkResponseInterface
{
return $this->executeGeneric($query, $parameters, true);
}

/**
* {@Inheritdoc}
* @throws \GuzzleHttp\Exception\ConnectException
*/
private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
{
$numServers = count($this->availableServers) - 1;

Expand All @@ -101,24 +121,8 @@ public function execute(string $query, array $parameters): CollectionInterface
// Move the selected server to the end of the stack
$this->availableServers[] = array_shift($this->availableServers);

$options = array_merge($this->httpOptions, [
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
'json' => [
'stmt' => $query,
'args' => $parameters,
],
]);

try {
$response = $this->httpClient->request('POST', '/_sql', $options);
$responseBody = json_decode((string)$response->getBody(), true);

return new Collection(
$responseBody['rows'],
$responseBody['cols'],
$responseBody['duration'],
$responseBody['rowcount']
);
return $this->sendRequest($server, $query, $parameters, $bulk_mode);
} catch (ConnectException $exception) {
// Catch it before the BadResponseException but do nothing.
continue;
Expand All @@ -130,7 +134,7 @@ public function execute(string $query, array $parameters): CollectionInterface
throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
}

$errorCode = $json['error']['code'];
$errorCode = $json['error']['code'];
$errorMessage = $json['error']['message'];

throw new RuntimeException($errorMessage, $errorCode, $exception);
Expand All @@ -144,6 +148,39 @@ public function execute(string $query, array $parameters): CollectionInterface
);
}

private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
{
$args_name = 'args';
if ($bulk_mode) {
$args_name = 'bulk_args';
}
$options = array_merge($this->httpOptions, [
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
'json' => [
'stmt' => $query,
$args_name => $parameters,
],
]);

$response = $this->httpClient->request('POST', '/_sql', $options);
$responseBody = json_decode((string)$response->getBody(), true);

if ($bulk_mode) {
return new BulkResponse(
$responseBody['results'],
$responseBody['cols'],
$responseBody['duration']
);
} else {
return new Collection(
$responseBody['rows'],
$responseBody['cols'],
$responseBody['duration'],
$responseBody['rowcount']
);
}
}

/**
* {@Inheritdoc}
*/
Expand Down
7 changes: 6 additions & 1 deletion src/Crate/PDO/PDO.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PDO extends BasePDO implements PDOInterface
'timeout' => 0.0,
'auth' => [],
'defaultSchema' => 'doc',
'bulkMode' => false,
];

/**
Expand Down Expand Up @@ -125,7 +126,11 @@ public function __construct($dsn, $username = null, $passwd = null, $options = [
$this->lastStatement = $statement;

try {
return $this->server->execute($sql, $parameters);
if ($statement->isBulkMode()) {
return $this->server->executeBulk($sql, $parameters);
} else {
return $this->server->execute($sql, $parameters);
}
} catch (Exception\RuntimeException $e) {
if ($this->getAttribute(self::ATTR_ERRMODE) === self::ERRMODE_EXCEPTION) {
throw new Exception\PDOException($e->getMessage(), $e->getCode());
Expand Down
Loading

0 comments on commit 8c706da

Please sign in to comment.