Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
},
"require": {
"php": "^8.4",
"guzzlehttp/promises": "^2.0",
"amphp/amp": "^3.1",
"amphp/http-client": "^5.3",
"amphp/http-client-psr7": "^1.1",
"guzzlehttp/psr7": "^2.6",
"php-http/client-common": "^2.0",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^2.0",
Expand Down
39 changes: 36 additions & 3 deletions src/Client/ClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace SimPod\ClickHouseClient\Client;

use GuzzleHttp\Promise\PromiseInterface;
use Amp\ByteStream\Payload;
use Amp\Future;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
Expand All @@ -15,24 +16,56 @@ interface ClickHouseAsyncClient
/**
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function select(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;

/**
* @param array<string, mixed> $params
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function selectWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;

/**
* @param Format<O> $outputFormat
*
* @return Future<Payload>
*
* @template O of Output
*/
public function selectStream(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future;

/**
* @param array<string, mixed> $params
* @param Format<O> $outputFormat
*
* @return Future<Payload>
*
* @template O of Output
*/
public function selectStreamWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future;
}
165 changes: 133 additions & 32 deletions src/Client/PsrClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,39 @@

namespace SimPod\ClickHouseClient\Client;

use Amp\ByteStream\Payload;
use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Psr7\PsrAdapter;
use Amp\Http\Client\Request as AmpRequest;
use Error;
use Exception;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use Http\Client\HttpAsyncClient;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\RequestInterface;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
use SimPod\ClickHouseClient\Exception\ServerError;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Logger\SqlLogger;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
use SimPod\ClickHouseClient\Settings\SettingsProvider;
use SimPod\ClickHouseClient\Sql\SqlFactory;
use SimPod\ClickHouseClient\Sql\ValueFormatter;
use Throwable;

Check failure on line 25 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Coding Standards (8.4)

Type Throwable is not used in this file.

use function Amp\async;
use function uniqid;

class PsrClickHouseAsyncClient implements ClickHouseAsyncClient
{
private SqlFactory $sqlFactory;

/** @param array<non-empty-string, string|string[]> $defaultHeaders */
public function __construct(
private HttpAsyncClient $asyncClient,
private HttpClient $client,
private RequestFactory $requestFactory,
private PsrAdapter $psrAdapter,
private array $defaultHeaders = [],
private SqlLogger|null $sqlLogger = null,
private SettingsProvider $defaultSettings = new EmptySettingsProvider(),
) {
Expand All @@ -45,7 +52,7 @@
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
return $this->selectWithParams($query, [], $outputFormat, $settings);
}

Expand All @@ -59,7 +66,7 @@
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
$formatClause = $outputFormat::toSql();

$sql = $this->sqlFactory->createWithParameters($query, $params);
Expand All @@ -71,24 +78,64 @@
CLICKHOUSE,
params: $params,
settings: $settings,
processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output(
$response->getBody()->__toString(),
),
processResponse: static fn (string $body) => $outputFormat::output($body),
);
}

/**
* {@inheritDoc}
*
* @throws Exception
*/
public function selectStream(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future {
return $this->selectStreamWithParams($query, [], $outputFormat, $settings);
}

/**
* {@inheritDoc}
*
* @throws Exception
*/
public function selectStreamWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future {
$formatClause = $outputFormat::toSql();

$sql = $this->sqlFactory->createWithParameters($query, $params);

return $this->executeStreamRequest(
<<<CLICKHOUSE
$sql
$formatClause
CLICKHOUSE,
params: $params,
settings: $settings,
);
}

/**
* @param array<string, mixed> $params
* @param (callable(ResponseInterface):mixed)|null $processResponse
* @param callable(string):T $processResponse
*
* @return Future<T>
*
* @throws Exception
*
* @template T
*/
private function executeRequest(
string $sql,
array $params,
SettingsProvider $settings,
callable|null $processResponse,
): PromiseInterface {
callable $processResponse,
): Future {
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
Expand All @@ -100,27 +147,81 @@
),
);

$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);
/** @var Future<T> $future */
$future = async(function () use ($processResponse, $request, $sql): mixed {

Check failure on line 151 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Method SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient::executeRequest() throws checked exception Error but it's missing from the PHPDoc @throws tag.
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

Check warning on line 153 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ /** @var Future<T> $future */ $future = async(function () use ($processResponse, $request, $sql): mixed { $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); + try { $response = $this->client->request($this->toAmpRequest($request));

try {
$response = $this->client->request($this->toAmpRequest($request));
$body = $response->getBody()->buffer();

if ($response->getStatus() !== 200) {
throw ServerError::fromResponseContent($body, $response->getStatus());
}

return $processResponse($body);
} finally {
$this->sqlLogger?->stopQuery($id);

Check warning on line 165 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ return $processResponse($body); } finally { - $this->sqlLogger?->stopQuery($id); + } });
}
});

return $future;
}

return Create::promiseFor(
$this->asyncClient->sendAsyncRequest($request),
)
->then(
function (ResponseInterface $response) use ($id, $processResponse) {
$this->sqlLogger?->stopQuery($id);
/**
* @param array<string, mixed> $params
*
* @return Future<Payload>
*
* @throws Exception
*/
private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future
{
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
$this->defaultSettings,
$settings,
),
new RequestOptions(
$params,
),
);

/** @var Future<Payload> $future */
$future = async(function () use ($request, $sql): Payload {

Check failure on line 193 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Static Analysis with PHPStan (8.4)

Method SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient::executeStreamRequest() throws checked exception Error but it's missing from the PHPDoc @throws tag.
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

Check warning on line 195 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ /** @var Future<Payload> $future */ $future = async(function () use ($request, $sql): Payload { $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); + try { $response = $this->client->request($this->toAmpRequest($request));

try {

Check warning on line 197 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "UnwrapFinally": @@ @@ $future = async(function () use ($request, $sql): Payload { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); - - try { - $response = $this->client->request($this->toAmpRequest($request)); - - if ($response->getStatus() !== 200) { - throw ServerError::fromResponseContent( - $response->getBody()->buffer(), - $response->getStatus(), - ); - } - - return $response->getBody(); - } finally { - $this->sqlLogger?->stopQuery($id); + $response = $this->client->request($this->toAmpRequest($request)); + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); } + return $response->getBody(); + $this->sqlLogger?->stopQuery($id); }); return $future;
$response = $this->client->request($this->toAmpRequest($request));

if ($response->getStatus() !== 200) {
throw ServerError::fromResponseContent(
$response->getBody()->buffer(),
$response->getStatus(),
);
}

return $response->getBody();
} finally {
$this->sqlLogger?->stopQuery($id);
}
});

return $future;
}

if ($response->getStatusCode() !== 200) {
throw ServerError::fromResponse($response);
}
/** @throws Error */
private function toAmpRequest(RequestInterface $request): AmpRequest
{
$ampRequest = $this->psrAdapter->fromPsrRequest($request);

if ($processResponse === null) {
return $response;
}
foreach ($this->defaultHeaders as $name => $values) {
$ampRequest->setHeader($name, $values);
}

return $processResponse($response);
},
fn () => $this->sqlLogger?->stopQuery($id),
);
return $ampRequest;
}
}
7 changes: 5 additions & 2 deletions src/Exception/ServerError.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ private function __construct(

public static function fromResponse(ResponseInterface $response): self
{
$bodyContent = $response->getBody()->__toString();
return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode());
}

public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self
{
$errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1
? (int) $codeMatches[1]
: 0;
Expand All @@ -35,7 +38,7 @@ public static function fromResponse(ResponseInterface $response): self
return new self(
$bodyContent,
$errorCode,
$response->getStatusCode(),
$httpStatusCode,
$exceptionName,
);
}
Expand Down
Loading
Loading