Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
},
"require": {
"php": "^8.4",
"guzzlehttp/promises": "^2.0",
"amphp/amp": "^3.1",
"amphp/http-client": "^5.3",
"guzzlehttp/psr7": "^2.6",
"php-http/client-common": "^2.0",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^2.0",
Expand Down
10 changes: 7 additions & 3 deletions src/Client/ClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace SimPod\ClickHouseClient\Client;

use GuzzleHttp\Promise\PromiseInterface;
use Amp\Future;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
Expand All @@ -15,24 +15,28 @@ interface ClickHouseAsyncClient
/**
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function select(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;

/**
* @param array<string, mixed> $params
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function selectWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;
}
86 changes: 56 additions & 30 deletions src/Client/PsrClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace SimPod\ClickHouseClient\Client;

use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request as AmpRequest;
use Exception;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use Http\Client\HttpAsyncClient;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\RequestInterface;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
Expand All @@ -20,16 +20,22 @@
use SimPod\ClickHouseClient\Settings\SettingsProvider;
use SimPod\ClickHouseClient\Sql\SqlFactory;
use SimPod\ClickHouseClient\Sql\ValueFormatter;
use Throwable;

use function Amp\async;
use function uniqid;

class PsrClickHouseAsyncClient implements ClickHouseAsyncClient
{
private SqlFactory $sqlFactory;

/**

Check failure on line 32 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Coding Standards (8.4)

Found multi-line doc comment with single line content, use one-line doc comment instead.
* @param array<string, string|string[]> $defaultHeaders
*/
public function __construct(
private HttpAsyncClient $asyncClient,
private HttpClient $client,
private RequestFactory $requestFactory,
private array $defaultHeaders = [],
private SqlLogger|null $sqlLogger = null,
private SettingsProvider $defaultSettings = new EmptySettingsProvider(),
) {
Expand All @@ -45,7 +51,7 @@
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
return $this->selectWithParams($query, [], $outputFormat, $settings);
}

Expand All @@ -59,7 +65,7 @@
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
$formatClause = $outputFormat::toSql();

$sql = $this->sqlFactory->createWithParameters($query, $params);
Expand All @@ -71,24 +77,22 @@
CLICKHOUSE,
params: $params,
settings: $settings,
processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output(
$response->getBody()->__toString(),
),
processResponse: static fn (string $body): Output => $outputFormat::output($body),
);
}

/**
* @param array<string, mixed> $params
* @param (callable(ResponseInterface):mixed)|null $processResponse
* @param (callable(string):mixed)|null $processResponse
*
* @throws Exception
*/
private function executeRequest(

Check failure on line 90 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Method SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient::executeRequest() return type with generic class Amp\Future does not specify its types: T
string $sql,
array $params,
SettingsProvider $settings,
callable|null $processResponse,
): PromiseInterface {
): Future {
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
Expand All @@ -100,27 +104,49 @@
),
);

$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);
return async(function () use ($processResponse, $request, $sql): mixed {
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

try {
$response = $this->client->request($this->toAmpRequest($request));
$body = $response->getBody()->buffer();

Check failure on line 113 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Coding Standards (8.4)

Equals sign not aligned with surrounding assignments; expected 5 spaces but found 1 space

$this->sqlLogger?->stopQuery($id);

if ($response->getStatus() !== 200) {
throw ServerError::fromResponseContent($body, $response->getStatus());
}

return Create::promiseFor(
$this->asyncClient->sendAsyncRequest($request),
)
->then(
function (ResponseInterface $response) use ($id, $processResponse) {
$this->sqlLogger?->stopQuery($id);
if ($processResponse === null) {
return $body;
}

return $processResponse($body);
} catch (Throwable $throwable) {
$this->sqlLogger?->stopQuery($id);

Check warning on line 127 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ return $processResponse($body); } catch (Throwable $throwable) { - $this->sqlLogger?->stopQuery($id); + throw $throwable; }

throw $throwable;
}
});
}

private function toAmpRequest(RequestInterface $request): AmpRequest
{
$ampRequest = new AmpRequest(
$request->getUri(),
$request->getMethod(),

Check failure on line 138 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Parameter #2 $method of class Amp\Http\Client\Request constructor expects non-empty-string, string given.
$request->getBody()->__toString(),
);

if ($response->getStatusCode() !== 200) {
throw ServerError::fromResponse($response);
}
foreach ($this->defaultHeaders as $name => $values) {

Check warning on line 142 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Foreach_": @@ @@ $request->getBody()->__toString(), ); - foreach ($this->defaultHeaders as $name => $values) { + foreach ([] as $name => $values) { $ampRequest->setHeader($name, $values); }
$ampRequest->setHeader($name, $values);

Check failure on line 143 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Parameter #1 $name of method Amp\Http\Client\Request::setHeader() expects non-empty-string, string given.

Check failure on line 143 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Method SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient::toAmpRequest() throws checked exception Error but it's missing from the PHPDoc @throws tag.
}

if ($processResponse === null) {
return $response;
}
foreach ($request->getHeaders() as $name => $values) {

Check warning on line 146 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Foreach_": @@ @@ $ampRequest->setHeader($name, $values); } - foreach ($request->getHeaders() as $name => $values) { + foreach ([] as $name => $values) { $ampRequest->setHeader($name, $values); }
$ampRequest->setHeader($name, $values);

Check failure on line 147 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Parameter #1 $name of method Amp\Http\Client\Request::setHeader() expects non-empty-string, (int|string) given.

Check failure on line 147 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Method SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient::toAmpRequest() throws checked exception Error but it's missing from the PHPDoc @throws tag.
}

return $processResponse($response);
},
fn () => $this->sqlLogger?->stopQuery($id),
);
return $ampRequest;
}
}
7 changes: 5 additions & 2 deletions src/Exception/ServerError.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ private function __construct(

public static function fromResponse(ResponseInterface $response): self
{
$bodyContent = $response->getBody()->__toString();
return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode());
}

public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self
{
$errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1
? (int) $codeMatches[1]
: 0;
Expand All @@ -35,7 +38,7 @@ public static function fromResponse(ResponseInterface $response): self
return new self(
$bodyContent,
$errorCode,
$response->getStatusCode(),
$httpStatusCode,
$exceptionName,
);
}
Expand Down
9 changes: 5 additions & 4 deletions tests/Client/SelectAsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace SimPod\ClickHouseClient\Tests\Client;

use GuzzleHttp\Promise\Utils;
use PHPUnit\Framework\Attributes\CoversClass;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient;
Expand All @@ -15,6 +14,8 @@
use SimPod\ClickHouseClient\Tests\TestCaseBase;
use SimPod\ClickHouseClient\Tests\WithClient;

use function Amp\Future\await;

#[CoversClass(RequestFactory::class)]
#[CoversClass(PsrClickHouseAsyncClient::class)]
#[CoversClass(ServerError::class)]
Expand All @@ -35,7 +36,7 @@ public function testAsyncSelect(): void
/** @var Json<array{number: int|string}> $format */
$format = new Json();

$promises = [
$futures = [
$client->select($sql, $format),
$client->select($sql, $format),
];
Expand All @@ -46,7 +47,7 @@ public function testAsyncSelect(): void
* \SimPod\ClickHouseClient\Output\Json<array{number: int|string}>
* } $jsonOutputs
*/
$jsonOutputs = Utils::all($promises)->wait();
$jsonOutputs = await($futures);

$expectedData = ClickHouseVersion::quotes64BitIntegersInJson()
? [['number' => '0'], ['number' => '1']]
Expand All @@ -60,6 +61,6 @@ public function testSelectFromNonExistentTableExpectServerError(): void
{
$this->expectException(ServerError::class);

self::$asyncClient->select('table', new TabSeparated())->wait();
self::$asyncClient->select('table', new TabSeparated())->await();
}
}
13 changes: 5 additions & 8 deletions tests/WithClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace SimPod\ClickHouseClient\Tests;

use Amp\Http\Client\HttpClientBuilder;
use InvalidArgumentException;
use Nyholm\Psr7\Factory\Psr17Factory;
use PHPUnit\Framework\Attributes\After;
Expand All @@ -18,12 +19,12 @@
use SimPod\ClickHouseClient\Exception\ServerError;
use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry;
use Symfony\Component\HttpClient\CurlHttpClient;
use Symfony\Component\HttpClient\HttplugClient;
use Symfony\Component\HttpClient\Psr18Client;

use function assert;
use function getenv;
use function is_string;
use function rawurlencode;
use function sprintf;
use function time;

Expand Down Expand Up @@ -111,19 +112,15 @@ private static function restartClickHouseClient(): void
);

static::$asyncClient = new PsrClickHouseAsyncClient(
new HttplugClient(
new CurlHttpClient([
'base_uri' => $endpoint,
'headers' => $headers,
'query' => ['database' => static::$currentDbName],
]),
),
HttpClientBuilder::buildDefault(),
new RequestFactory(
new ParamValueConverterRegistry(),
new Psr17Factory(),
new Psr17Factory(),
new Psr17Factory(),
$endpoint . '?database=' . rawurlencode(static::$currentDbName),
),
$headers,
);

static::$controllerClient->executeQuery(sprintf('DROP DATABASE IF EXISTS "%s"', static::$currentDbName));
Expand Down
Loading