diff --git a/composer.json b/composer.json index e8cd916..90ec8aa 100644 --- a/composer.json +++ b/composer.json @@ -29,9 +29,10 @@ }, "require": { "php": "^8.4", - "guzzlehttp/promises": "^2.0", + "amphp/amp": "^3.1", + "amphp/http-client": "^5.3", + "amphp/http-client-psr7": "^1.1", "guzzlehttp/psr7": "^2.6", - "php-http/client-common": "^2.0", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", "psr/http-message": "^2.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index b40590b..71081f2 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,7 +4,8 @@ namespace SimPod\ClickHouseClient\Client; -use GuzzleHttp\Promise\PromiseInterface; +use Amp\ByteStream\Payload; +use Amp\Future; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; @@ -15,18 +16,22 @@ interface ClickHouseAsyncClient /** * @param Format $outputFormat * + * @return Future + * * @template O of Output */ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; + ): Future; /** * @param array $params * @param Format $outputFormat * + * @return Future + * * @template O of Output */ public function selectWithParams( @@ -34,5 +39,33 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; + ): Future; + + /** + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + + /** + * @param array $params + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index f0938bb..8db0bfc 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,32 +4,39 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; +use Amp\Future; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request as AmpRequest; +use Error; use Exception; -use GuzzleHttp\Promise\Create; -use GuzzleHttp\Promise\PromiseInterface; -use Http\Client\HttpAsyncClient; -use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; -use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; +use Throwable; +use function Amp\async; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; + /** @param array $defaultHeaders */ public function __construct( - private HttpAsyncClient $asyncClient, + private HttpClient $client, private RequestFactory $requestFactory, + private PsrAdapter $psrAdapter, + private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), ) { @@ -45,7 +52,7 @@ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { return $this->selectWithParams($query, [], $outputFormat, $settings); } @@ -59,7 +66,7 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { $formatClause = $outputFormat::toSql(); $sql = $this->sqlFactory->createWithParameters($query, $params); @@ -71,24 +78,64 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output( - $response->getBody()->__toString(), - ), + processResponse: static fn (string $body) => $outputFormat::output($body), + ); + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectStreamWithParams($query, [], $outputFormat, $settings); + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + $formatClause = $outputFormat::toSql(); + + $sql = $this->sqlFactory->createWithParameters($query, $params); + + return $this->executeStreamRequest( + << $params - * @param (callable(ResponseInterface):mixed)|null $processResponse + * @param callable(string):T $processResponse + * + * @return Future * * @throws Exception + * + * @template T */ private function executeRequest( string $sql, array $params, SettingsProvider $settings, - callable|null $processResponse, - ): PromiseInterface { + callable $processResponse, + ): Future { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -100,27 +147,81 @@ private function executeRequest( ), ); - $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); + /** @var Future $future */ + $future = async(function () use ($processResponse, $request, $sql): mixed { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $response = $this->client->request($this->toAmpRequest($request)); + $body = $response->getBody()->buffer(); + + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent($body, $response->getStatus()); + } + + return $processResponse($body); + } finally { + $this->sqlLogger?->stopQuery($id); + } + }); + + return $future; + } - return Create::promiseFor( - $this->asyncClient->sendAsyncRequest($request), - ) - ->then( - function (ResponseInterface $response) use ($id, $processResponse) { - $this->sqlLogger?->stopQuery($id); + /** + * @param array $params + * + * @return Future + * + * @throws Exception + */ + private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future + { + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( + $this->defaultSettings, + $settings, + ), + new RequestOptions( + $params, + ), + ); + + /** @var Future $future */ + $future = async(function () use ($request, $sql): Payload { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $response = $this->client->request($this->toAmpRequest($request)); + + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); + } + + return $response->getBody(); + } finally { + $this->sqlLogger?->stopQuery($id); + } + }); + + return $future; + } - if ($response->getStatusCode() !== 200) { - throw ServerError::fromResponse($response); - } + /** @throws Error */ + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); - if ($processResponse === null) { - return $response; - } + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } - return $processResponse($response); - }, - fn () => $this->sqlLogger?->stopQuery($id), - ); + return $ampRequest; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index 1ffe240..2642e4c 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -22,8 +22,11 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - $bodyContent = $response->getBody()->__toString(); + return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode()); + } + public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self + { $errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -35,7 +38,7 @@ public static function fromResponse(ResponseInterface $response): self return new self( $bodyContent, $errorCode, - $response->getStatusCode(), + $httpStatusCode, $exceptionName, ); } diff --git a/tests/Client/PsrClickHouseAsyncClientTest.php b/tests/Client/PsrClickHouseAsyncClientTest.php new file mode 100644 index 0000000..b3816d3 --- /dev/null +++ b/tests/Client/PsrClickHouseAsyncClientTest.php @@ -0,0 +1,65 @@ +request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + } +} diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index bda77ff..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,7 +4,6 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use GuzzleHttp\Promise\Utils; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; @@ -15,6 +14,8 @@ use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; +use function Amp\Future\await; + #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(ServerError::class)] @@ -35,7 +36,7 @@ public function testAsyncSelect(): void /** @var Json $format */ $format = new Json(); - $promises = [ + $futures = [ $client->select($sql, $format), $client->select($sql, $format), ]; @@ -46,7 +47,7 @@ public function testAsyncSelect(): void * \SimPod\ClickHouseClient\Output\Json * } $jsonOutputs */ - $jsonOutputs = Utils::all($promises)->wait(); + $jsonOutputs = await($futures); $expectedData = ClickHouseVersion::quotes64BitIntegersInJson() ? [['number' => '0'], ['number' => '1']] @@ -60,6 +61,24 @@ public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); - self::$asyncClient->select('table', new TabSeparated())->wait(); + self::$asyncClient->select('table', new TabSeparated())->await(); + } + + public function testAsyncSelectStream(): void + { + $stream = self::$asyncClient->selectStream('SELECT 1 AS data', new TabSeparated())->await(); + + self::assertSame("1\n", $stream->buffer()); + } + + public function testAsyncSelectStreamWithParams(): void + { + $stream = self::$asyncClient->selectStreamWithParams( + 'SELECT {p1:UInt8} AS data', + ['p1' => 3], + new TabSeparated(), + )->await(); + + self::assertSame("3\n", $stream->buffer()); } } diff --git a/tests/WithClient.php b/tests/WithClient.php index cea96f4..d3095ef 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -4,6 +4,8 @@ namespace SimPod\ClickHouseClient\Tests; +use Amp\Http\Client\HttpClientBuilder; +use Amp\Http\Client\Psr7\PsrAdapter; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -18,12 +20,12 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use Symfony\Component\HttpClient\CurlHttpClient; -use Symfony\Component\HttpClient\HttplugClient; use Symfony\Component\HttpClient\Psr18Client; use function assert; use function getenv; use function is_string; +use function rawurlencode; use function sprintf; use function time; @@ -111,19 +113,16 @@ private static function restartClickHouseClient(): void ); static::$asyncClient = new PsrClickHouseAsyncClient( - new HttplugClient( - new CurlHttpClient([ - 'base_uri' => $endpoint, - 'headers' => $headers, - 'query' => ['database' => static::$currentDbName], - ]), - ), + HttpClientBuilder::buildDefault(), new RequestFactory( new ParamValueConverterRegistry(), new Psr17Factory(), new Psr17Factory(), new Psr17Factory(), + $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + new PsrAdapter(new Psr17Factory(), new Psr17Factory()), + $headers, ); static::$controllerClient->executeQuery(sprintf('DROP DATABASE IF EXISTS "%s"', static::$currentDbName));