Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions src/Internal/ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ public function rollbackTransaction(): Future
return $deferred->getFuture();
}

public function prepare(string $sql): Future
{
return $this->executePrepare(new Tasks\PrepareStatement($this->config, $sql));
}

public function execute(string $sql, array $params = []): Future
{
return $this->executeQuery(new Tasks\ExecuteStatement($this->config, $sql, $params));
Expand All @@ -237,9 +232,11 @@ public function close(): void
}
}

public function shutdown(): void
public function countParameters(string $sql): int
{
$this->worker->shutdown();
$count = preg_match_all('/\?|:[a-zA-Z_]\w*/', $sql, $matches);

return $count ?: 0;
}

/**
Expand Down Expand Up @@ -319,38 +316,6 @@ protected function executeQuery(Task $task): Future
return $deferred->getFuture();
}

/**
* @return Future<array{parameterCount: int, columnDefinitions: array<array{name: string, type: string, declaredType: string|null, table: string|null, length: int, flags: int, decimals: int}>}>
*/
protected function executePrepare(Task $task): Future
{
if ($this->isClosed()) {
$this->throwConnectionException();
}

$execution = $this->worker->submit($task);

/** @var Result $taskResult */
$taskResult = $execution->await();

if ($taskResult->failed()) {
$deferred = new DeferredFuture();
$deferred->error(new Error($taskResult->message() ?? "Failed to prepare statement"));

return $deferred->getFuture();
}

$this->lastUsedAt = time();

$data = $taskResult->output();
$data['columnDefinitions'] = $this->buildColumnDefinitions($data['columnDefinitions'] ?? null);

$deferred = new DeferredFuture();
$deferred->complete($data);

return $deferred->getFuture();
}

protected function throwConnectionException(): never
{
throw new ConnectionFailureException('The connection has been closed');
Expand Down
8 changes: 0 additions & 8 deletions src/Internal/SqliteConnectionStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,11 @@ public function execute(array $params = []): SqliteResult
$result = $execution->await();

if ($result instanceof Error) {
$this->processor->shutdown();

throw new SqliteException('Failed to execute statement: ' . $result->getMessage(), 0, $result);
}

$this->lastUsedAt = time();

if ($this->processor::class === ConnectionProcessor::class) {
$this->processor->shutdown();
}

return $result;
}

Expand Down Expand Up @@ -113,8 +107,6 @@ public function close(): void
{
$this->closed = true;
$this->bindings = [];

$this->processor->shutdown();
}

public function onClose(Closure $onClose): void
Expand Down
20 changes: 2 additions & 18 deletions src/Internal/SqliteConnectionTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

use Amp\Sql\SqlTransactionIsolation;
use Closure;
use Error;
use Phenix\Sqlite\Contracts\SqliteResult;
use Phenix\Sqlite\Contracts\SqliteStatement;
use Phenix\Sqlite\Contracts\SqliteTransaction;
use Phenix\Sqlite\Internal\Exceptions\SqliteException;
use Phenix\Sqlite\Internal\Exceptions\SqliteTransactionException;

class SqliteConnectionTransaction implements SqliteTransaction
Expand Down Expand Up @@ -46,17 +44,11 @@ public function prepare(string $sql): SqliteStatement
$this->throwTransactionException();
}

$data = $this->processor->prepare($sql)->await();

if ($data instanceof Error) {
throw new SqliteException('Failed to prepare statement: ' . $data->getMessage());
}

return new SqliteConnectionStatement(
processor: $this->processor,
sql: $sql,
parameterCount: $data['parameterCount'] ?? 0,
columnDefinitions: $data['columnDefinitions'] ?? [],
parameterCount: $this->processor->countParameters($sql),
columnDefinitions: [],
);
}

Expand Down Expand Up @@ -103,10 +95,6 @@ public function commit(): void
foreach ($this->onCommitCallbacks as $callback) {
$callback();
}

if ($this->savepointId === null) {
$this->processor->shutdown();
}
}

public function rollback(): void
Expand All @@ -128,10 +116,6 @@ public function rollback(): void
foreach ($this->onRollbackCallbacks as $callback) {
$callback();
}

if ($this->savepointId === null) {
$this->processor->shutdown();
}
}

public function isActive(): bool
Expand Down
24 changes: 0 additions & 24 deletions src/Internal/SqliteWorkerFactory.php

This file was deleted.

24 changes: 0 additions & 24 deletions src/Internal/Tasks/ConnectDatabase.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ protected function mapToSqliteType(string $nativeType): string
};
}

protected function countParameters(string $sql): int
{
$count = preg_match_all('/\?|:[a-zA-Z_][a-zA-Z0-9_]*/', $sql, $matches);

return $count ?: 0;
}

protected function query(PDO $pdo, string $sql): Result
{
$stmt = $pdo->query($sql);
Expand Down Expand Up @@ -164,23 +157,6 @@ protected function query(PDO $pdo, string $sql): Result
]);
}

protected function prepare(PDO $pdo, string $sql): Result
{
$stmt = $pdo->prepare($sql);

if (! $stmt) {
return Result::failure(message: "Failed to prepare statement: {$sql}");
}

$parameterCount = $this->countParameters($sql);
$columnDefinitions = $this->buildColumnDefinitions($stmt);

return Result::success([
'parameterCount' => $parameterCount,
'columnDefinitions' => $columnDefinitions,
]);
}

protected function execute(PDO $pdo, string $sql, array $params): Result
{
$stmt = $pdo->prepare($sql);
Expand Down
31 changes: 0 additions & 31 deletions src/Internal/Tasks/PrepareStatement.php

This file was deleted.

35 changes: 0 additions & 35 deletions src/Internal/Tasks/PrepareTransactionStatement.php

This file was deleted.

5 changes: 0 additions & 5 deletions src/Internal/TransactionConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ public function query(string $query): Future
return $this->executeQuery(new ExecuteTransactionQuery($this->config, $query));
}

public function prepare(string $sql): Future
{
return $this->executePrepare(new Tasks\PrepareTransactionStatement($this->config, $sql));
}

public function execute(string $sql, array $params = []): Future
{
return $this->executeQuery(new Tasks\ExecuteTransactionStatement($this->config, $sql, $params));
Expand Down
5 changes: 0 additions & 5 deletions src/Internal/sqlite-worker.php

This file was deleted.

18 changes: 6 additions & 12 deletions src/SqliteConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
use Amp\Sql\SqlTransactionIsolation;
use Amp\Sql\SqlTransactionIsolationLevel;
use Closure;
use Error;
use Phenix\Sqlite\Contracts\SqliteConnection as SqliteConnectionContract;
use Phenix\Sqlite\Contracts\SqliteResult;
use Phenix\Sqlite\Contracts\SqliteStatement;
use Phenix\Sqlite\Contracts\SqliteTransaction;
use Phenix\Sqlite\Internal\Exceptions\SqliteException;
use Phenix\Sqlite\Internal\SqliteConnectionStatement;
use Phenix\Sqlite\Internal\SqliteWorkerFactory;
use Revolt\EventLoop;
use Throwable;

use function Amp\Parallel\Worker\getWorker;

class SqliteConnection implements SqliteConnectionContract
{
use ForbidCloning;
Expand Down Expand Up @@ -108,7 +108,7 @@ public function beginTransaction(): SqliteTransaction

$this->busy = $deferred = new DeferredFuture();

$processor = new Internal\TransactionConnectionProcessor($this->config, SqliteWorkerFactory::create());
$processor = new Internal\TransactionConnectionProcessor($this->config, getWorker());

try {
$result = $processor->beginTransaction()->await();
Expand Down Expand Up @@ -141,20 +141,14 @@ public function prepare(string $sql): SqliteStatement

$this->busy = $deferred = new DeferredFuture();

$processor = new Internal\ConnectionProcessor($this->config, SqliteWorkerFactory::create());
$processor = new Internal\ConnectionProcessor($this->config, getWorker());

try {
$data = $processor->prepare($sql)->await();

if ($data instanceof Error) {
throw new SqliteException('Failed to prepare statement: ' . $data->getMessage());
}

$statement = new SqliteConnectionStatement(
processor: $processor,
sql: $sql,
parameterCount: $data['parameterCount'] ?? 0,
columnDefinitions: $data['columnDefinitions'] ?? [],
parameterCount: $processor->countParameters($sql),
columnDefinitions: [],
);
} catch (Throwable $exception) {
$this->busy = null;
Expand Down
40 changes: 0 additions & 40 deletions tests/Unit/Tasks/PrepareStatementTest.php

This file was deleted.

Loading