Skip to content

Commit

Permalink
CrateDB bulk operations: Add support for improved DML efficiency
Browse files Browse the repository at this point in the history
https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations

- In order to use the bulk operations interface, a `PDOStatement` needs
  to be prepared using the `bulkMode` option, like
  `->prepare($sql, ["bulkMode" => true])`.
- The interface of `BulkResponse` has been made compatible with
  `Collection`, specifically wrt. the `getRows()` method, in order to
  return data from the driver without needing other proprietary methods.
- In order to propagate the non-standard bulk response shape back, the
  user has to select the `PDO::FETCH_NUM` fetch style.
- Documentation: Add two example programs about insert operations
  • Loading branch information
amotl committed May 4, 2023
1 parent efa9345 commit 82d1e2e
Show file tree
Hide file tree
Showing 13 changed files with 607 additions and 55 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Changelog for crate-pdo
Unreleased
==========

- Added support for `CrateDB bulk operations`_, for improved efficiency on
DML operations.

.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations

2022/11/29 2.1.4
================

Expand Down
37 changes: 21 additions & 16 deletions DEVELOP.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ If the environment is outdated, you can upgrade it::

composer update

If you see messages like ``Warning: No code coverage driver available`` when running
the tests, you will need to install the ``xdebug`` extension::

pecl install xdebug

It may happen that you will have to re-install it, for example after your PHP
version has been upgraded by your package manager.


Running the Tests
=================
Expand All @@ -60,16 +68,26 @@ You can run the tests like::
# Run test suite
composer run test

# Run code style checks
composer run style

# Output coverage report as HTML
composer run -- test --coverage-html ./report
open report/index.html

# Run specific tests
composer run -- test --filter "testFetchColumn"


Invoke code style checks
========================

::

# Run code style checks
composer run check-style

# Some code style quirks can be automatically fixed
composer run fix-style



************
Using Docker
Expand Down Expand Up @@ -121,19 +139,6 @@ Running the Tests
open build/multicover/html/index.html


Invoke code style checks
========================

::

# Run code style checks
composer run check-style

# Some code style quirks can be automatically fixed
composer run fix-style



****************************
Working on the documentation
****************************
Expand Down
34 changes: 34 additions & 0 deletions docs/connect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,40 @@ method used.
| | ``PDO::FETCH_OBJ`` |
+----------------------------+-----------------------+

Bulk operations
===============

With CrateDB :ref:`crate-reference:http-bulk-ops`, suitable for ``INSERT``,
``UPDATE``, and ``DELETE`` statements, you can submit multiple records, aka.
batches, to CrateDB within a single operation. By using this way of communication,
both the client and the server will not waste resources on building and decoding
huge SQL statements, and data will also propagate more efficiently between CrateDB
cluster nodes.

To use this mode, the ``PDOStatement`` offers a corresponding ``bulkMode`` option.
When creating a statement instance with it, the ``$parameters`` data will be
obtained as a **list of records**, like demonstrated in the example below.

Please note that you **must** use ``PDO::FETCH_NUM`` on the fetch operation,
because the response object type ``BulkResponse`` is different than the regular
response type ``Collection``.

.. code-block:: php
// Run insert operation.
$parameters = [[5, 'foo', 1], [6, 'bar', 2], [7, 'foo', 3], [8, 'bar', 4]];
$statement = $connection->prepare(
'INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)',
array("bulkMode" => true));
$statement->execute($parameters);
// Evaluate response.
// MUST use `PDO::FETCH_NUM` for returning bulk operation responses.
print("Total count: {$statement->rowCount()}\n");
$response = $statement->fetchAll(PDO::FETCH_NUM);
print_r($response);
Next steps
==========

Expand Down
18 changes: 17 additions & 1 deletion src/Crate/PDO/Http/ServerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
namespace Crate\PDO\Http;

use Crate\PDO\PDOInterface;
use Crate\Stdlib\BulkResponseInterface;
use Crate\Stdlib\CollectionInterface;

interface ServerInterface
Expand All @@ -49,7 +50,22 @@ public function configure(PDOInterface $PDO): void;
*
* @return CollectionInterface
*/
public function execute(string $queryString, array $parameters): CollectionInterface;
public function execute(string $queryString, array $parameters): ?CollectionInterface;

/**
* Execute the PDOStatement in "bulk operations" and return the response from server
* wrapped inside a BulkResponseInterface.
*
* Bulk operations are only supported for `INSERT`, `UPDATE`, and `DELETE` statements.
*
* https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
*
* @param string $queryString
* @param array $parameters
*
* @return BulkResponseInterface
*/
public function executeBulk(string $queryString, array $parameters): ?BulkResponseInterface;

/**
* @return array
Expand Down
90 changes: 65 additions & 25 deletions src/Crate/PDO/Http/ServerPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
use Crate\PDO\Exception\RuntimeException;
use Crate\PDO\PDO;
use Crate\PDO\PDOInterface;
use Crate\Stdlib\BulkResponse;
use Crate\Stdlib\BulkResponseInterface;
use Crate\Stdlib\Collection;
use Crate\Stdlib\CollectionInterface;
use GuzzleHttp\Client;
Expand Down Expand Up @@ -67,7 +69,7 @@ final class ServerPool implements ServerInterface
/**
* Client constructor.
*
* @param array $servers
* @param array $servers
* @param ClientInterface|null $client
*/
public function __construct(array $servers, ClientInterface $client = null)
Expand All @@ -90,8 +92,27 @@ public function __construct(array $servers, ClientInterface $client = null)
* {@Inheritdoc}
* @throws \GuzzleHttp\Exception\ConnectException
*/
public function execute(string $query, array $parameters): CollectionInterface
public function execute(string $query, array $parameters): ?CollectionInterface
{
return $this->executeGeneric($query, $parameters, false);
}

/**
* {@Inheritdoc}
* @throws \GuzzleHttp\Exception\ConnectException
*/
public function executeBulk(string $query, array $parameters): ?BulkResponseInterface
{
return $this->executeGeneric($query, $parameters, true);
}

/**
* {@Inheritdoc}
* @throws \GuzzleHttp\Exception\ConnectException
*/
private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
{
$exception = null;
$numServers = count($this->availableServers) - 1;

for ($i = 0; $i <= $numServers; $i++) {
Expand All @@ -101,24 +122,8 @@ public function execute(string $query, array $parameters): CollectionInterface
// Move the selected server to the end of the stack
$this->availableServers[] = array_shift($this->availableServers);

$options = array_merge($this->httpOptions, [
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
'json' => [
'stmt' => $query,
'args' => $parameters,
],
]);

try {
$response = $this->httpClient->request('POST', '/_sql', $options);
$responseBody = json_decode((string)$response->getBody(), true);

return new Collection(
$responseBody['rows'],
$responseBody['cols'],
$responseBody['duration'],
$responseBody['rowcount']
);
return $this->sendRequest($server, $query, $parameters, $bulk_mode);
} catch (ConnectException $exception) {
// Catch it before the BadResponseException but do nothing.
continue;
Expand All @@ -130,18 +135,53 @@ public function execute(string $query, array $parameters): CollectionInterface
throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
}

$errorCode = $json['error']['code'];
$errorCode = $json['error']['code'];
$errorMessage = $json['error']['message'];

throw new RuntimeException($errorMessage, $errorCode, $exception);
}
}

throw new ConnectException(
sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
$exception->getRequest(),
$exception
);
if ($exception !== null) {
throw new ConnectException(
sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
$exception->getRequest(),
$exception
);
}
}

private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
{
$args_name = 'args';
if ($bulk_mode) {
$args_name = 'bulk_args';
}
$options = array_merge($this->httpOptions, [
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
'json' => [
'stmt' => $query,
$args_name => $parameters,
],
]);

$response = $this->httpClient->request('POST', '/_sql', $options);
$responseBody = json_decode((string)$response->getBody(), true);

if ($bulk_mode) {
return new BulkResponse(
$responseBody['results'],
$responseBody['cols'],
$responseBody['duration']
);
} else {
return new Collection(
$responseBody['rows'],
$responseBody['cols'],
$responseBody['duration'],
$responseBody['rowcount']
);
}
}

/**
Expand Down
7 changes: 6 additions & 1 deletion src/Crate/PDO/PDO.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PDO extends BasePDO implements PDOInterface
'timeout' => 0.0,
'auth' => [],
'defaultSchema' => 'doc',
'bulkMode' => false,
];

/**
Expand Down Expand Up @@ -125,7 +126,11 @@ public function __construct($dsn, $username = null, $passwd = null, $options = [
$this->lastStatement = $statement;

try {
return $this->server->execute($sql, $parameters);
if ($statement->isBulkMode()) {
return $this->server->executeBulk($sql, $parameters);
} else {
return $this->server->execute($sql, $parameters);
}
} catch (Exception\RuntimeException $e) {
if ($this->getAttribute(self::ATTR_ERRMODE) === self::ERRMODE_EXCEPTION) {
throw new Exception\PDOException($e->getMessage(), $e->getCode());
Expand Down
44 changes: 32 additions & 12 deletions src/Crate/PDO/PDOStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PDOStatement extends BasePDOStatement implements IteratorAggregate
* @var array
*/
private $options = [
'bulkMode' => false,
'fetchMode' => null,
'fetchColumn' => 0,
'fetchClass' => 'array',
Expand Down Expand Up @@ -201,21 +202,17 @@ private function updateBoundColumns(array $row)
/**
* {@inheritDoc}
*/
public function execute($input_parameters = null)
public function execute($input_parameters = null): bool
{
$input_parameters_array = ArrayUtils::toArray($input_parameters);
$zero_based = array_key_exists(0, $input_parameters_array);
foreach ($input_parameters_array as $parameter => $value) {
if (is_int($parameter) && $zero_based) {
$parameter++;
}
$this->bindValue($parameter, $value);
}
$params = ArrayUtils::toArray($input_parameters);

// parameter binding might be unordered, so sort it before execute
ksort($this->parameters);
// In bulk mode, propagate input parameters 1:1.
// In regular mode, translate input parameters to `bindValue` calls.
if ($this->options["bulkMode"] !== true) {
$params = $this->bindValues($params);
}

$result = $this->request->__invoke($this, $this->sql, array_values($this->parameters));
$result = $this->request->__invoke($this, $this->sql, $params);

if (is_array($result)) {
$this->errorCode = $result['code'];
Expand All @@ -229,6 +226,24 @@ public function execute($input_parameters = null)
return true;
}

/**
* Bind `execute`'s $input_parameters values to statement handle.
*/
private function bindValues(array $params_in): array
{
$zero_based = array_key_exists(0, $params_in);
foreach ($params_in as $parameter => $value) {
if (is_int($parameter) && $zero_based) {
$parameter++;
}
$this->bindValue($parameter, $value);
}

// parameter binding might be unordered, so sort it before execute
ksort($this->parameters);
return array_values($this->parameters);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -680,4 +695,9 @@ private function getObjectResult(array $columns, array $row)

return $obj;
}

public function isBulkMode()
{
return $this->options["bulkMode"];
}
}
Loading

0 comments on commit 82d1e2e

Please sign in to comment.