Skip to content

Commit b978289

Browse files
committed
Merge remote-tracking branch 'origin/master'
# Conflicts: # src/Transport/ClickhouseCLIClientTransport.php
2 parents 0ee45b5 + 4d17b41 commit b978289

File tree

5 files changed

+126
-12
lines changed

5 files changed

+126
-12
lines changed

src/Client.php

+29
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,35 @@ public function insertFiles(string $table, array $columns, array $files, string
406406
return $this->getTransport()->sendAsyncFilesWithQuery($this->getServer(), $query, $files, $concurrency);
407407
}
408408

409+
/**
410+
* Performs insert query using all given files as one block of data
411+
*
412+
* @param string $table
413+
* @param array $columns
414+
* @param array $files
415+
* @param string|null $format
416+
* @param array $options
417+
*
418+
* @return mixed
419+
* @throws ClientException
420+
*/
421+
public function insertFilesAsOne(string $table, array $columns, array $files, string $format = null, $options = [])
422+
{
423+
if (is_null($format)) {
424+
$format = Format::CSV;
425+
}
426+
427+
$query = 'INSERT INTO '.$table.' ('.implode(', ', $columns).') FORMAT '.strtoupper($format);
428+
429+
foreach ($files as $file) {
430+
if (!is_file($file)) {
431+
throw ClientException::insertFileNotFound($file);
432+
}
433+
}
434+
435+
return $this->getTransport()->sendFilesAsOneWithQuery($this->getServer(), $query, $files, $options);
436+
}
437+
409438
/**
410439
* Executes query.
411440
*

src/Exceptions/ClientException.php

+5
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,9 @@ public static function insertFileNotFound($file)
3737
{
3838
return new static('File '.$file.' is not found');
3939
}
40+
41+
public static function notSupported($operation)
42+
{
43+
return new static('Operation '.$operation.' is not supported');
44+
}
4045
}

src/Interfaces/TransportInterface.php

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ interface TransportInterface
2222
* @return bool
2323
*/
2424
public function send(Server $server, string $query): bool;
25-
25+
2626
/**
2727
* Sends async insert queries with given files.
2828
*
@@ -34,7 +34,19 @@ public function send(Server $server, string $query): bool;
3434
* @return array
3535
*/
3636
public function sendAsyncFilesWithQuery(Server $server, string $query, array $files, int $concurrency = 5): array;
37-
37+
38+
/**
39+
* Sends files as one block of data.
40+
*
41+
* @param \Tinderbox\Clickhouse\Server $server
42+
* @param string $query
43+
* @param array $files
44+
* @param array $settings
45+
*
46+
* @return bool
47+
*/
48+
public function sendFilesAsOneWithQuery(Server $server, string $query, array $files, array $settings = []): bool;
49+
3850
/**
3951
* Executes SELECT queries and returns result.
4052
*
@@ -47,7 +59,7 @@ public function sendAsyncFilesWithQuery(Server $server, string $query, array $fi
4759
* @return \Tinderbox\Clickhouse\Query\Result
4860
*/
4961
public function get(Server $server, string $query, $tables = null): Result;
50-
62+
5163
/**
5264
* Executes async SELECT queries and returns result.
5365
*

src/Transport/ClickhouseCLIClientTransport.php

+60-9
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,39 @@ class ClickhouseCLIClientTransport implements TransportInterface
2121
*/
2222
protected $executablePath;
2323

24+
/**
25+
* Path to executable cat command
26+
*
27+
* @var string
28+
*/
29+
protected $catExecutablePath;
30+
2431
/**
2532
* Last execute query
2633
*
2734
* @var string
2835
*/
2936
protected $lastQuery = '';
3037

38+
/**
39+
* Use special ccat command to cat files into one without ARG_MAX limits
40+
*
41+
* @var bool
42+
*/
43+
protected $useCcat;
44+
3145
/**
3246
* ClickhouseCLIClientTransport constructor.
3347
*
3448
* @param string|null $executablePath
49+
* @param string|null $catExecutablePath
50+
* @param bool $useCcat
3551
*/
36-
public function __construct(string $executablePath = null)
52+
public function __construct(string $executablePath = null, string $catExecutablePath = null, bool $useCcat = false)
3753
{
3854
$this->setExecutablePath($executablePath);
55+
$this->setCatExecutablePath($catExecutablePath);
56+
$this->useCcat = $useCcat;
3957
}
4058

4159
/**
@@ -52,6 +70,20 @@ protected function setExecutablePath(string $executablePath = null)
5270
$this->executablePath = $executablePath;
5371
}
5472

73+
/**
74+
* Set path to cat executable.
75+
*
76+
* @param string|null $catExecutablePath
77+
*/
78+
protected function setCatExecutablePath(string $catExecutablePath = null)
79+
{
80+
if (is_null($catExecutablePath)) {
81+
$catExecutablePath = 'cat';
82+
}
83+
84+
$this->catExecutablePath = $catExecutablePath;
85+
}
86+
5587
/**
5688
* Sends query to given $server.
5789
*
@@ -120,15 +152,27 @@ public function sendFilesAsOneWithQuery(Server $server, string $query, array $fi
120152
* (IFS=$'\n'; cat $(< tmp-file))
121153
*
122154
* to iterate through file and cat each file into stdout
155+
*
156+
* ccat is used to ignore ARG_MAX constant when trying to push too many files
123157
*/
124-
$fileName = $this->writeTemporaryFile(implode(PHP_EOL, array_map(function(string $file) {
125-
return 'cat '.$file;
126-
}, $files)));
158+
if ($this->useCcat) {
159+
$files = implode(PHP_EOL, $files);
160+
} else {
161+
$files = implode(PHP_EOL, array_map(function(string $file) {
162+
return 'cat '.$file;
163+
}, $files));
164+
}
165+
166+
$fileName = $this->writeTemporaryFile($files);
127167

128168
$command = $this->buildCommandForWriteFilesAsOne($server, $query, $fileName, $settings);
129169

130170
try {
131171
$this->executeCommand($command);
172+
173+
if (!is_null($fileName)) {
174+
$this->removeTemporaryFile($fileName);
175+
}
132176
} catch (\Throwable $e) {
133177
$this->removeTemporaryFile($fileName);
134178

@@ -243,7 +287,7 @@ protected function buildCommandForWrite(Server $server, string $query, string $f
243287
$command = [];
244288

245289
if (!is_null($file)) {
246-
$command[] = "$(which cat) ".$file.' |';
290+
$command[] = $this->catExecutablePath.' '.$file.' |';
247291
}
248292

249293
$command = array_merge($command, [
@@ -275,14 +319,21 @@ protected function buildCommandForWriteFilesAsOne(Server $server, string $query,
275319
{
276320
$query = escapeshellarg($query);
277321

278-
$command = [
279-
'$(cat '.$file.') | ',
322+
$command = [];
323+
324+
if ($this->useCcat) {
325+
$command[] = $this->catExecutablePath.' '.$file.' | ';
326+
} else {
327+
$command[] = '$('.$this->catExecutablePath.' '.$file.') | ';
328+
}
329+
330+
$command = array_merge($command, [
280331
$this->executablePath,
281332
"--host='{$server->getHost()}'",
282333
"--port='{$server->getPort()}'",
283334
"--database='{$server->getDatabase()}'",
284335
"--query={$query}",
285-
];
336+
]);
286337

287338
foreach ($settings as $key => $value) {
288339
$command[] = '--'.$key.'='.escapeshellarg($value);
@@ -306,7 +357,7 @@ protected function buildCommandForRead(Server $server, string $query, $tables =
306357
$fileName = $this->writeTemporaryFile($query);
307358

308359
$command = [
309-
"cat {$fileName} |",
360+
$this->catExecutablePath." {$fileName} |",
310361
$this->executablePath,
311362
"--host='{$server->getHost()}'",
312363
"--port='{$server->getPort()}'",

src/Transport/HttpTransport.php

+17
Original file line numberDiff line numberDiff line change
@@ -429,4 +429,21 @@ protected function buildRequestUri(Server $server, array $query = []): string
429429

430430
return $uri.'?'.http_build_query($query);
431431
}
432+
433+
/**
434+
* Sends files as one block of data.
435+
*
436+
* NOT SUPPORTED VIA HTTP INTERFACE
437+
*
438+
* @param \Tinderbox\Clickhouse\Server $server
439+
* @param string $query
440+
* @param array $files
441+
* @param array $settings
442+
*
443+
* @return bool
444+
*/
445+
public function sendFilesAsOneWithQuery(Server $server, string $query, array $files, array $settings = []): bool
446+
{
447+
throw ClientException::notSupported('inserting multiple files as one via http interface');
448+
}
432449
}

0 commit comments

Comments
 (0)