From 429ac26ed9e7bd54e08e2a2e47c9fd4fba07cff2 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 09:01:50 +0300 Subject: [PATCH 1/9] feat(async): add fiber future select methods --- composer.json | 1 + src/Client/ClickHouseAsyncClient.php | 29 ++++++++++++++ src/Client/PsrClickHouseAsyncClient.php | 52 +++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/composer.json b/composer.json index e8cd916..e0a4a84 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,7 @@ }, "require": { "php": "^8.4", + "amphp/amp": "^3.1", "guzzlehttp/promises": "^2.0", "guzzlehttp/psr7": "^2.6", "php-http/client-common": "^2.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index b40590b..9824dba 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\Future; use GuzzleHttp\Promise\PromiseInterface; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -23,6 +24,19 @@ public function select( SettingsProvider $settings = new EmptySettingsProvider(), ): PromiseInterface; + /** + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectFuture( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + /** * @param array $params * @param Format $outputFormat @@ -35,4 +49,19 @@ public function selectWithParams( Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): PromiseInterface; + + /** + * @param array $params + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectWithParamsFuture( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index f0938bb..e470800 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,11 +4,14 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\DeferredFuture; +use Amp\Future; use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -20,6 +23,7 @@ use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; +use Throwable; use function uniqid; @@ -49,6 +53,19 @@ public function select( return $this->selectWithParams($query, [], $outputFormat, $settings); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectFuture( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectWithParamsFuture($query, [], $outputFormat, $settings); + } + /** * {@inheritDoc} * @@ -77,6 +94,41 @@ public function selectWithParams( ); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectWithParamsFuture( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return self::futureForPromise($this->selectWithParams($query, $params, $outputFormat, $settings)); + } + + /** + * @param PromiseInterface $promise + * + * @return Future + * + * @template T + */ + private static function futureForPromise(PromiseInterface $promise): Future + { + $deferred = new DeferredFuture(); + + $promise->then( + static fn (mixed $value) => $deferred->complete($value), + static fn (mixed $reason) => $deferred->error( + $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), + ), + ); + + return $deferred->getFuture(); + } + /** * @param array $params * @param (callable(ResponseInterface):mixed)|null $processResponse From ee48d0104758d5725b5a2822859abcfb26c71c43 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 10:37:21 +0300 Subject: [PATCH 2/9] refactor(async): return futures from async client --- composer.json | 1 - src/Client/ClickHouseAsyncClient.php | 29 +------- src/Client/PsrClickHouseAsyncClient.php | 89 ++++++++----------------- tests/Client/SelectAsyncTest.php | 9 +-- 4 files changed, 33 insertions(+), 95 deletions(-) diff --git a/composer.json b/composer.json index e0a4a84..f925bc5 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,6 @@ "require": { "php": "^8.4", "amphp/amp": "^3.1", - "guzzlehttp/promises": "^2.0", "guzzlehttp/psr7": "^2.6", "php-http/client-common": "^2.0", "psr/http-client": "^1.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index 9824dba..d61c00c 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -5,7 +5,6 @@ namespace SimPod\ClickHouseClient\Client; use Amp\Future; -use GuzzleHttp\Promise\PromiseInterface; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; @@ -13,17 +12,6 @@ interface ClickHouseAsyncClient { - /** - * @param Format $outputFormat - * - * @template O of Output - */ - public function select( - string $query, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; - /** * @param Format $outputFormat * @@ -31,25 +19,12 @@ public function select( * * @template O of Output */ - public function selectFuture( + public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): Future; - /** - * @param array $params - * @param Format $outputFormat - * - * @template O of Output - */ - public function selectWithParams( - string $query, - array $params, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; - /** * @param array $params * @param Format $outputFormat @@ -58,7 +33,7 @@ public function selectWithParams( * * @template O of Output */ - public function selectWithParamsFuture( + public function selectWithParams( string $query, array $params, Format $outputFormat, diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index e470800..629e71d 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,8 +7,6 @@ use Amp\DeferredFuture; use Amp\Future; use Exception; -use GuzzleHttp\Promise\Create; -use GuzzleHttp\Promise\PromiseInterface; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; use RuntimeException; @@ -49,21 +47,8 @@ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { - return $this->selectWithParams($query, [], $outputFormat, $settings); - } - - /** - * {@inheritDoc} - * - * @throws Exception - */ - public function selectFuture( - string $query, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), ): Future { - return $this->selectWithParamsFuture($query, [], $outputFormat, $settings); + return $this->selectWithParams($query, [], $outputFormat, $settings); } /** @@ -76,7 +61,7 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { $formatClause = $outputFormat::toSql(); $sql = $this->sqlFactory->createWithParameters($query, $params); @@ -94,41 +79,6 @@ public function selectWithParams( ); } - /** - * {@inheritDoc} - * - * @throws Exception - */ - public function selectWithParamsFuture( - string $query, - array $params, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): Future { - return self::futureForPromise($this->selectWithParams($query, $params, $outputFormat, $settings)); - } - - /** - * @param PromiseInterface $promise - * - * @return Future - * - * @template T - */ - private static function futureForPromise(PromiseInterface $promise): Future - { - $deferred = new DeferredFuture(); - - $promise->then( - static fn (mixed $value) => $deferred->complete($value), - static fn (mixed $reason) => $deferred->error( - $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), - ), - ); - - return $deferred->getFuture(); - } - /** * @param array $params * @param (callable(ResponseInterface):mixed)|null $processResponse @@ -140,7 +90,7 @@ private function executeRequest( array $params, SettingsProvider $settings, callable|null $processResponse, - ): PromiseInterface { + ): Future { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -155,11 +105,11 @@ private function executeRequest( $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); - return Create::promiseFor( - $this->asyncClient->sendAsyncRequest($request), - ) - ->then( - function (ResponseInterface $response) use ($id, $processResponse) { + $deferred = new DeferredFuture(); + + $this->asyncClient->sendAsyncRequest($request)->then( + function (ResponseInterface $response) use ($deferred, $id, $processResponse): void { + try { $this->sqlLogger?->stopQuery($id); if ($response->getStatusCode() !== 200) { @@ -167,12 +117,25 @@ function (ResponseInterface $response) use ($id, $processResponse) { } if ($processResponse === null) { - return $response; + $deferred->complete($response); + + return; } - return $processResponse($response); - }, - fn () => $this->sqlLogger?->stopQuery($id), - ); + $deferred->complete($processResponse($response)); + } catch (Throwable $throwable) { + $deferred->error($throwable); + } + }, + function (mixed $reason) use ($deferred, $id): void { + $this->sqlLogger?->stopQuery($id); + + $deferred->error( + $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), + ); + }, + ); + + return $deferred->getFuture(); } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index bda77ff..acdfc5c 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,7 +4,6 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use GuzzleHttp\Promise\Utils; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; @@ -15,6 +14,8 @@ use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; +use function Amp\Future\await; + #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(ServerError::class)] @@ -35,7 +36,7 @@ public function testAsyncSelect(): void /** @var Json $format */ $format = new Json(); - $promises = [ + $futures = [ $client->select($sql, $format), $client->select($sql, $format), ]; @@ -46,7 +47,7 @@ public function testAsyncSelect(): void * \SimPod\ClickHouseClient\Output\Json * } $jsonOutputs */ - $jsonOutputs = Utils::all($promises)->wait(); + $jsonOutputs = await($futures); $expectedData = ClickHouseVersion::quotes64BitIntegersInJson() ? [['number' => '0'], ['number' => '1']] @@ -60,6 +61,6 @@ public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); - self::$asyncClient->select('table', new TabSeparated())->wait(); + self::$asyncClient->select('table', new TabSeparated())->await(); } } From 5bbee57e96b7a763318e0060cab748129aca8845 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 10:49:24 +0300 Subject: [PATCH 3/9] refactor(async): use amp http client transport --- composer.json | 2 +- src/Client/PsrClickHouseAsyncClient.php | 81 ++++++++++++++----------- src/Exception/ServerError.php | 7 ++- tests/WithClient.php | 13 ++-- 4 files changed, 57 insertions(+), 46 deletions(-) diff --git a/composer.json b/composer.json index f925bc5..b932f04 100644 --- a/composer.json +++ b/composer.json @@ -30,8 +30,8 @@ "require": { "php": "^8.4", "amphp/amp": "^3.1", + "amphp/http-client": "^5.3", "guzzlehttp/psr7": "^2.6", - "php-http/client-common": "^2.0", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", "psr/http-message": "^2.0", diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 629e71d..687e70c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,12 +4,11 @@ namespace SimPod\ClickHouseClient\Client; -use Amp\DeferredFuture; use Amp\Future; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Request as AmpRequest; use Exception; -use Http\Client\HttpAsyncClient; -use Psr\Http\Message\ResponseInterface; -use RuntimeException; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -23,15 +22,20 @@ use SimPod\ClickHouseClient\Sql\ValueFormatter; use Throwable; +use function Amp\async; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; + /** + * @param array $defaultHeaders + */ public function __construct( - private HttpAsyncClient $asyncClient, + private HttpClient $client, private RequestFactory $requestFactory, + private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), ) { @@ -73,15 +77,13 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output( - $response->getBody()->__toString(), - ), + processResponse: static fn (string $body): Output => $outputFormat::output($body), ); } /** * @param array $params - * @param (callable(ResponseInterface):mixed)|null $processResponse + * @param (callable(string):mixed)|null $processResponse * * @throws Exception */ @@ -102,40 +104,49 @@ private function executeRequest( ), ); - $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); + return async(function () use ($processResponse, $request, $sql): mixed { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); - $deferred = new DeferredFuture(); + try { + $response = $this->client->request($this->toAmpRequest($request)); + $body = $response->getBody()->buffer(); - $this->asyncClient->sendAsyncRequest($request)->then( - function (ResponseInterface $response) use ($deferred, $id, $processResponse): void { - try { - $this->sqlLogger?->stopQuery($id); - - if ($response->getStatusCode() !== 200) { - throw ServerError::fromResponse($response); - } - - if ($processResponse === null) { - $deferred->complete($response); + $this->sqlLogger?->stopQuery($id); - return; - } + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent($body, $response->getStatus()); + } - $deferred->complete($processResponse($response)); - } catch (Throwable $throwable) { - $deferred->error($throwable); + if ($processResponse === null) { + return $body; } - }, - function (mixed $reason) use ($deferred, $id): void { + + return $processResponse($body); + } catch (Throwable $throwable) { $this->sqlLogger?->stopQuery($id); - $deferred->error( - $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), - ); - }, + throw $throwable; + } + }); + } + + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = new AmpRequest( + $request->getUri(), + $request->getMethod(), + $request->getBody()->__toString(), ); - return $deferred->getFuture(); + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + foreach ($request->getHeaders() as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + return $ampRequest; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index 1ffe240..2642e4c 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -22,8 +22,11 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - $bodyContent = $response->getBody()->__toString(); + return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode()); + } + public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self + { $errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -35,7 +38,7 @@ public static function fromResponse(ResponseInterface $response): self return new self( $bodyContent, $errorCode, - $response->getStatusCode(), + $httpStatusCode, $exceptionName, ); } diff --git a/tests/WithClient.php b/tests/WithClient.php index cea96f4..f996e9e 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Tests; +use Amp\Http\Client\HttpClientBuilder; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -18,12 +19,12 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use Symfony\Component\HttpClient\CurlHttpClient; -use Symfony\Component\HttpClient\HttplugClient; use Symfony\Component\HttpClient\Psr18Client; use function assert; use function getenv; use function is_string; +use function rawurlencode; use function sprintf; use function time; @@ -111,19 +112,15 @@ private static function restartClickHouseClient(): void ); static::$asyncClient = new PsrClickHouseAsyncClient( - new HttplugClient( - new CurlHttpClient([ - 'base_uri' => $endpoint, - 'headers' => $headers, - 'query' => ['database' => static::$currentDbName], - ]), - ), + HttpClientBuilder::buildDefault(), new RequestFactory( new ParamValueConverterRegistry(), new Psr17Factory(), new Psr17Factory(), new Psr17Factory(), + $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + $headers, ); static::$controllerClient->executeQuery(sprintf('DROP DATABASE IF EXISTS "%s"', static::$currentDbName)); From 6ad42b8fd0bee01f57b79a5de4da3e087eaa6f85 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 11:17:05 +0300 Subject: [PATCH 4/9] refactor(async): use amp psr7 adapter --- composer.json | 1 + src/Client/PsrClickHouseAsyncClient.php | 52 ++++++++++--------------- tests/WithClient.php | 2 + 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/composer.json b/composer.json index b932f04..90ec8aa 100644 --- a/composer.json +++ b/composer.json @@ -31,6 +31,7 @@ "php": "^8.4", "amphp/amp": "^3.1", "amphp/http-client": "^5.3", + "amphp/http-client-psr7": "^1.1", "guzzlehttp/psr7": "^2.6", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 687e70c..42aefb0 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -6,9 +6,8 @@ use Amp\Future; use Amp\Http\Client\HttpClient; -use Amp\Http\Client\Request as AmpRequest; +use Amp\Http\Client\Psr7\PsrAdapter; use Exception; -use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -29,12 +28,11 @@ class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; - /** - * @param array $defaultHeaders - */ + /** @param array $defaultHeaders */ public function __construct( private HttpClient $client, private RequestFactory $requestFactory, + private PsrAdapter $psrAdapter, private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), @@ -83,15 +81,19 @@ public function selectWithParams( /** * @param array $params - * @param (callable(string):mixed)|null $processResponse + * @param callable(string):T $processResponse + * + * @return Future * * @throws Exception + * + * @template T */ private function executeRequest( string $sql, array $params, SettingsProvider $settings, - callable|null $processResponse, + callable $processResponse, ): Future { $request = $this->requestFactory->prepareSqlRequest( $sql, @@ -104,13 +106,20 @@ private function executeRequest( ), ); - return async(function () use ($processResponse, $request, $sql): mixed { + /** @var Future $future */ + $future = async(function () use ($processResponse, $request, $sql): mixed { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); try { - $response = $this->client->request($this->toAmpRequest($request)); - $body = $response->getBody()->buffer(); + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + $response = $this->client->request($ampRequest); + $body = $response->getBody()->buffer(); $this->sqlLogger?->stopQuery($id); @@ -118,10 +127,6 @@ private function executeRequest( throw ServerError::fromResponseContent($body, $response->getStatus()); } - if ($processResponse === null) { - return $body; - } - return $processResponse($body); } catch (Throwable $throwable) { $this->sqlLogger?->stopQuery($id); @@ -129,24 +134,7 @@ private function executeRequest( throw $throwable; } }); - } - - private function toAmpRequest(RequestInterface $request): AmpRequest - { - $ampRequest = new AmpRequest( - $request->getUri(), - $request->getMethod(), - $request->getBody()->__toString(), - ); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - foreach ($request->getHeaders() as $name => $values) { - $ampRequest->setHeader($name, $values); - } - return $ampRequest; + return $future; } } diff --git a/tests/WithClient.php b/tests/WithClient.php index f996e9e..d3095ef 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -5,6 +5,7 @@ namespace SimPod\ClickHouseClient\Tests; use Amp\Http\Client\HttpClientBuilder; +use Amp\Http\Client\Psr7\PsrAdapter; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -120,6 +121,7 @@ private static function restartClickHouseClient(): void new Psr17Factory(), $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + new PsrAdapter(new Psr17Factory(), new Psr17Factory()), $headers, ); From 0ea22162eb09259b2035c2904fbf255cdc8db354 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 11:42:10 +0300 Subject: [PATCH 5/9] style(async): remove arrow function return type --- src/Client/PsrClickHouseAsyncClient.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 42aefb0..ea17586 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -14,7 +14,6 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; -use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; @@ -75,7 +74,7 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (string $body): Output => $outputFormat::output($body), + processResponse: static fn (string $body) => $outputFormat::output($body), ); } From a218cbdf7c55430d826fba23f21a8262197b3871 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:21:26 +0300 Subject: [PATCH 6/9] feat(async): add select stream methods --- src/Client/ClickHouseAsyncClient.php | 29 ++++++++ src/Client/PsrClickHouseAsyncClient.php | 92 +++++++++++++++++++++++++ tests/Client/SelectAsyncTest.php | 18 +++++ 3 files changed, 139 insertions(+) diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index d61c00c..71081f2 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; use Amp\Future; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -39,4 +40,32 @@ public function selectWithParams( Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): Future; + + /** + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + + /** + * @param array $params + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index ea17586..38d325c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; use Amp\Future; use Amp\Http\Client\HttpClient; use Amp\Http\Client\Psr7\PsrAdapter; @@ -78,6 +79,44 @@ public function selectWithParams( ); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectStreamWithParams($query, [], $outputFormat, $settings); + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + $formatClause = $outputFormat::toSql(); + + $sql = $this->sqlFactory->createWithParameters($query, $params); + + return $this->executeStreamRequest( + << $params * @param callable(string):T $processResponse @@ -136,4 +175,57 @@ private function executeRequest( return $future; } + + /** + * @param array $params + * + * @return Future + * + * @throws Exception + */ + private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future + { + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( + $this->defaultSettings, + $settings, + ), + new RequestOptions( + $params, + ), + ); + + /** @var Future $future */ + $future = async(function () use ($request, $sql): Payload { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + $response = $this->client->request($ampRequest); + $this->sqlLogger?->stopQuery($id); + + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); + } + + return $response->getBody(); + } catch (Throwable $throwable) { + $this->sqlLogger?->stopQuery($id); + + throw $throwable; + } + }); + + return $future; + } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index acdfc5c..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -63,4 +63,22 @@ public function testSelectFromNonExistentTableExpectServerError(): void self::$asyncClient->select('table', new TabSeparated())->await(); } + + public function testAsyncSelectStream(): void + { + $stream = self::$asyncClient->selectStream('SELECT 1 AS data', new TabSeparated())->await(); + + self::assertSame("1\n", $stream->buffer()); + } + + public function testAsyncSelectStreamWithParams(): void + { + $stream = self::$asyncClient->selectStreamWithParams( + 'SELECT {p1:UInt8} AS data', + ['p1' => 3], + new TabSeparated(), + )->await(); + + self::assertSame("3\n", $stream->buffer()); + } } From 4ff58db09eb4c1d405dd1286edf858b0685112d3 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:34:53 +0300 Subject: [PATCH 7/9] test(async): cover default request headers --- src/Client/PsrClickHouseAsyncClient.php | 31 ++++++++-------- tests/Client/SelectAsyncTest.php | 48 +++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 38d325c..e0f64f4 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -8,7 +8,10 @@ use Amp\Future; use Amp\Http\Client\HttpClient; use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request as AmpRequest; +use Error; use Exception; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -150,13 +153,7 @@ private function executeRequest( $this->sqlLogger?->startQuery($id, $sql); try { - $ampRequest = $this->psrAdapter->fromPsrRequest($request); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - $response = $this->client->request($ampRequest); + $response = $this->client->request($this->toAmpRequest($request)); $body = $response->getBody()->buffer(); $this->sqlLogger?->stopQuery($id); @@ -202,13 +199,7 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid $this->sqlLogger?->startQuery($id, $sql); try { - $ampRequest = $this->psrAdapter->fromPsrRequest($request); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - $response = $this->client->request($ampRequest); + $response = $this->client->request($this->toAmpRequest($request)); $this->sqlLogger?->stopQuery($id); if ($response->getStatus() !== 200) { @@ -228,4 +219,16 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid return $future; } + + /** @throws Error */ + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + return $ampRequest; + } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index 46f4fd7..1a33e19 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,12 +4,21 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Amp\Cancellation; +use Amp\Http\Client\DelegateHttpClient; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request; +use Amp\Http\Client\Response; +use Amp\Http\InvalidHeaderException; +use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Json; use SimPod\ClickHouseClient\Format\TabSeparated; +use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\ClickHouseVersion; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -57,6 +66,45 @@ public function testAsyncSelect(): void self::assertSame($expectedData, $jsonOutputs[1]->data); } + public function testDefaultHeadersAreSent(): void + { + $delegate = new class implements DelegateHttpClient { + public Request|null $request = null; + + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response + { + $this->request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + } + public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); From ea89b3ef197f44ca0f18cd355d337a94c9bdd246 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:56:36 +0300 Subject: [PATCH 8/9] test(async): isolate default header coverage --- tests/Client/PsrClickHouseAsyncClientTest.php | 65 +++++++++++++++++++ tests/Client/SelectAsyncTest.php | 48 -------------- 2 files changed, 65 insertions(+), 48 deletions(-) create mode 100644 tests/Client/PsrClickHouseAsyncClientTest.php diff --git a/tests/Client/PsrClickHouseAsyncClientTest.php b/tests/Client/PsrClickHouseAsyncClientTest.php new file mode 100644 index 0000000..b3816d3 --- /dev/null +++ b/tests/Client/PsrClickHouseAsyncClientTest.php @@ -0,0 +1,65 @@ +request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + } +} diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index 1a33e19..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,21 +4,12 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use Amp\Cancellation; -use Amp\Http\Client\DelegateHttpClient; -use Amp\Http\Client\HttpClient; -use Amp\Http\Client\Psr7\PsrAdapter; -use Amp\Http\Client\Request; -use Amp\Http\Client\Response; -use Amp\Http\InvalidHeaderException; -use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Json; use SimPod\ClickHouseClient\Format\TabSeparated; -use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\ClickHouseVersion; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -66,45 +57,6 @@ public function testAsyncSelect(): void self::assertSame($expectedData, $jsonOutputs[1]->data); } - public function testDefaultHeadersAreSent(): void - { - $delegate = new class implements DelegateHttpClient { - public Request|null $request = null; - - /** @throws InvalidHeaderException */ - public function request(Request $request, Cancellation $cancellation): Response - { - $this->request = $request; - - return new Response('1.1', 200, null, [], "1\n", $request); - } - }; - - $psr17Factory = new Psr17Factory(); - $client = new PsrClickHouseAsyncClient( - new HttpClient($delegate, []), - new RequestFactory( - new ParamValueConverterRegistry(), - $psr17Factory, - $psr17Factory, - $psr17Factory, - 'https://clickhouse.example', - ), - new PsrAdapter($psr17Factory, $psr17Factory), - [ - 'X-ClickHouse-Key' => 'secret', - 'X-ClickHouse-User' => 'user', - ], - ); - - $client->select('SELECT 1', new TabSeparated())->await(); - - $request = $delegate->request; - self::assertInstanceOf(Request::class, $request); - self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); - self::assertSame('user', $request->getHeader('X-ClickHouse-User')); - } - public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); From 067a4319c76e531f26e9ceffe11781cccd1a4e44 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 14:51:19 +0300 Subject: [PATCH 9/9] fix(async): stop query logging once --- src/Client/PsrClickHouseAsyncClient.php | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index e0f64f4..8db0bfc 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -156,17 +156,13 @@ private function executeRequest( $response = $this->client->request($this->toAmpRequest($request)); $body = $response->getBody()->buffer(); - $this->sqlLogger?->stopQuery($id); - if ($response->getStatus() !== 200) { throw ServerError::fromResponseContent($body, $response->getStatus()); } return $processResponse($body); - } catch (Throwable $throwable) { + } finally { $this->sqlLogger?->stopQuery($id); - - throw $throwable; } }); @@ -200,7 +196,6 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid try { $response = $this->client->request($this->toAmpRequest($request)); - $this->sqlLogger?->stopQuery($id); if ($response->getStatus() !== 200) { throw ServerError::fromResponseContent( @@ -210,10 +205,8 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid } return $response->getBody(); - } catch (Throwable $throwable) { + } finally { $this->sqlLogger?->stopQuery($id); - - throw $throwable; } });