Skip to content

Commit 4d17b41

Browse files
author
TinderBox
authored
Merge pull request #4 from the-tinderbox/ability_to_send_multiple_files_as_one_file
Ability to send multiple files as one file
2 parents 14a960c + 6f8d6fb commit 4d17b41

File tree

5 files changed

+206
-13
lines changed

5 files changed

+206
-13
lines changed

src/Client.php

+29
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,35 @@ public function insertFiles(string $table, array $columns, array $files, string
406406
return $this->getTransport()->sendAsyncFilesWithQuery($this->getServer(), $query, $files, $concurrency);
407407
}
408408

409+
/**
410+
* Performs insert query using all given files as one block of data
411+
*
412+
* @param string $table
413+
* @param array $columns
414+
* @param array $files
415+
* @param string|null $format
416+
* @param array $options
417+
*
418+
* @return mixed
419+
* @throws ClientException
420+
*/
421+
public function insertFilesAsOne(string $table, array $columns, array $files, string $format = null, $options = [])
422+
{
423+
if (is_null($format)) {
424+
$format = Format::CSV;
425+
}
426+
427+
$query = 'INSERT INTO '.$table.' ('.implode(', ', $columns).') FORMAT '.strtoupper($format);
428+
429+
foreach ($files as $file) {
430+
if (!is_file($file)) {
431+
throw ClientException::insertFileNotFound($file);
432+
}
433+
}
434+
435+
return $this->getTransport()->sendFilesAsOneWithQuery($this->getServer(), $query, $files, $options);
436+
}
437+
409438
/**
410439
* Executes query.
411440
*

src/Exceptions/ClientException.php

+5
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,9 @@ public static function insertFileNotFound($file)
3737
{
3838
return new static('File '.$file.' is not found');
3939
}
40+
41+
public static function notSupported($operation)
42+
{
43+
return new static('Operation '.$operation.' is not supported');
44+
}
4045
}

src/Interfaces/TransportInterface.php

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ interface TransportInterface
2222
* @return bool
2323
*/
2424
public function send(Server $server, string $query): bool;
25-
25+
2626
/**
2727
* Sends async insert queries with given files.
2828
*
@@ -34,7 +34,19 @@ public function send(Server $server, string $query): bool;
3434
* @return array
3535
*/
3636
public function sendAsyncFilesWithQuery(Server $server, string $query, array $files, int $concurrency = 5): array;
37-
37+
38+
/**
39+
* Sends files as one block of data.
40+
*
41+
* @param \Tinderbox\Clickhouse\Server $server
42+
* @param string $query
43+
* @param array $files
44+
* @param array $settings
45+
*
46+
* @return bool
47+
*/
48+
public function sendFilesAsOneWithQuery(Server $server, string $query, array $files, array $settings = []): bool;
49+
3850
/**
3951
* Executes SELECT queries and returns result.
4052
*
@@ -47,7 +59,7 @@ public function sendAsyncFilesWithQuery(Server $server, string $query, array $fi
4759
* @return \Tinderbox\Clickhouse\Query\Result
4860
*/
4961
public function get(Server $server, string $query, $tables = null): Result;
50-
62+
5163
/**
5264
* Executes async SELECT queries and returns result.
5365
*

src/Transport/ClickhouseCLIClientTransport.php

+140-10
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,39 @@ class ClickhouseCLIClientTransport implements TransportInterface
2121
*/
2222
protected $executablePath;
2323

24+
/**
25+
* Path to executable cat command
26+
*
27+
* @var string
28+
*/
29+
protected $catExecutablePath;
30+
2431
/**
2532
* Last execute query
2633
*
2734
* @var string
2835
*/
2936
protected $lastQuery = '';
3037

38+
/**
39+
* Use special ccat command to cat files into one without ARG_MAX limits
40+
*
41+
* @var bool
42+
*/
43+
protected $useCcat;
44+
3145
/**
3246
* ClickhouseCLIClientTransport constructor.
3347
*
3448
* @param string|null $executablePath
49+
* @param string|null $catExecutablePath
50+
* @param bool $useCcat
3551
*/
36-
public function __construct(string $executablePath = null)
52+
public function __construct(string $executablePath = null, string $catExecutablePath = null, bool $useCcat = false)
3753
{
3854
$this->setExecutablePath($executablePath);
55+
$this->setCatExecutablePath($catExecutablePath);
56+
$this->useCcat = $useCcat;
3957
}
4058

4159
/**
@@ -52,6 +70,20 @@ protected function setExecutablePath(string $executablePath = null)
5270
$this->executablePath = $executablePath;
5371
}
5472

73+
/**
74+
* Set path to cat executable.
75+
*
76+
* @param string|null $catExecutablePath
77+
*/
78+
protected function setCatExecutablePath(string $catExecutablePath = null)
79+
{
80+
if (is_null($catExecutablePath)) {
81+
$catExecutablePath = 'cat';
82+
}
83+
84+
$this->catExecutablePath = $catExecutablePath;
85+
}
86+
5587
/**
5688
* Sends query to given $server.
5789
*
@@ -99,6 +131,57 @@ public function sendAsyncFilesWithQuery(Server $server, string $query, array $fi
99131
return $result;
100132
}
101133

134+
/**
135+
* Sends files as one block of data.
136+
*
137+
* @param \Tinderbox\Clickhouse\Server $server
138+
* @param string $query
139+
* @param array $files
140+
* @param array $settings
141+
*
142+
* @return bool
143+
* @throws \Throwable
144+
*/
145+
public function sendFilesAsOneWithQuery(Server $server, string $query, array $files, array $settings = []): bool
146+
{
147+
$this->setLastQuery($query);
148+
149+
/*
150+
* We will put the list of files into tmp file and then we will use command:
151+
*
152+
* (IFS=$'\n'; cat $(< tmp-file))
153+
*
154+
* to iterate through file and cat each file into stdout
155+
*
156+
* ccat is used to ignore ARG_MAX constant when trying to push too many files
157+
*/
158+
if ($this->useCcat) {
159+
$files = implode(PHP_EOL, $files);
160+
} else {
161+
$files = implode(PHP_EOL, array_map(function(string $file) {
162+
return 'cat '.$file;
163+
}, $files));
164+
}
165+
166+
$fileName = $this->writeTemporaryFile($files);
167+
168+
$command = $this->buildCommandForWriteFilesAsOne($server, $query, $fileName, $settings);
169+
170+
try {
171+
$this->executeCommand($command);
172+
173+
if (!is_null($fileName)) {
174+
$this->removeTemporaryFile($fileName);
175+
}
176+
} catch (\Throwable $e) {
177+
$this->removeTemporaryFile($fileName);
178+
179+
throw $e;
180+
}
181+
182+
return true;
183+
}
184+
102185
/**
103186
* Executes SELECT queries and returns result.
104187
*
@@ -120,13 +203,13 @@ public function get(Server $server, string $query, $tables = null): Result
120203
$response = $this->executeCommand($command);
121204

122205
if (!is_null($file)) {
123-
$this->removeQueryFile($file);
206+
$this->removeTemporaryFile($file);
124207
}
125208

126209
return $this->assembleResult($response);
127210
} catch (\Throwable $e) {
128211
if (!is_null($file)) {
129-
$this->removeQueryFile($file);
212+
$this->removeTemporaryFile($file);
130213
}
131214

132215
throw $e;
@@ -165,7 +248,7 @@ public function getAsync(Server $server, array $queries, int $concurrency = 5):
165248
*
166249
* @return string
167250
*/
168-
protected function writeQueryInFile(string $query) : string
251+
protected function writeTemporaryFile(string $query) : string
169252
{
170253
$tmpDir = sys_get_temp_dir();
171254
$fileName = tempnam($tmpDir, 'clickhouse_client');
@@ -182,7 +265,7 @@ protected function writeQueryInFile(string $query) : string
182265
*
183266
* @param string $fileName
184267
*/
185-
protected function removeQueryFile(string $fileName)
268+
protected function removeTemporaryFile(string $fileName)
186269
{
187270
unlink($fileName);
188271
}
@@ -193,17 +276,18 @@ protected function removeQueryFile(string $fileName)
193276
* @param \Tinderbox\Clickhouse\Server $server
194277
* @param string $query
195278
* @param string|null $file
279+
* @param array $settings
196280
*
197281
* @return string
198282
*/
199-
protected function buildCommandForWrite(Server $server, string $query, string $file = null) : string
283+
protected function buildCommandForWrite(Server $server, string $query, string $file = null, array $settings = []) : string
200284
{
201285
$query = escapeshellarg($query);
202286

203287
$command = [];
204288

205289
if (!is_null($file)) {
206-
$command[] = "cat ".$file.' |';
290+
$command[] = $this->catExecutablePath.' '.$file.' |';
207291
}
208292

209293
$command = array_merge($command, [
@@ -214,6 +298,47 @@ protected function buildCommandForWrite(Server $server, string $query, string $f
214298
"--query={$query}"
215299
]);
216300

301+
foreach ($settings as $key => $value) {
302+
$command[] = '--'.$key.'='.escapeshellarg($value);
303+
}
304+
305+
return implode(' ', $command);
306+
}
307+
308+
/**
309+
* Builds command for write
310+
*
311+
* @param \Tinderbox\Clickhouse\Server $server
312+
* @param string $query
313+
* @param string|null $file
314+
* @param array $settings
315+
*
316+
* @return string
317+
*/
318+
protected function buildCommandForWriteFilesAsOne(Server $server, string $query, string $file, array $settings = []) : string
319+
{
320+
$query = escapeshellarg($query);
321+
322+
$command = [];
323+
324+
if ($this->useCcat) {
325+
$command[] = $this->catExecutablePath.' '.$file.' | ';
326+
} else {
327+
$command[] = '$('.$this->catExecutablePath.' '.$file.') | ';
328+
}
329+
330+
$command = array_merge($command, [
331+
$this->executablePath,
332+
"--host='{$server->getHost()}'",
333+
"--port='{$server->getPort()}'",
334+
"--database='{$server->getDatabase()}'",
335+
"--query={$query}",
336+
]);
337+
338+
foreach ($settings as $key => $value) {
339+
$command[] = '--'.$key.'='.escapeshellarg($value);
340+
}
341+
217342
return implode(' ', $command);
218343
}
219344

@@ -223,15 +348,16 @@ protected function buildCommandForWrite(Server $server, string $query, string $f
223348
* @param \Tinderbox\Clickhouse\Server $server
224349
* @param string $query
225350
* @param null $tables
351+
* @param array $settings
226352
*
227353
* @return array
228354
*/
229-
protected function buildCommandForRead(Server $server, string $query, $tables = null) : array
355+
protected function buildCommandForRead(Server $server, string $query, $tables = null, array $settings = []) : array
230356
{
231-
$fileName = $this->writeQueryInFile($query);
357+
$fileName = $this->writeTemporaryFile($query);
232358

233359
$command = [
234-
"cat {$fileName} |",
360+
$this->catExecutablePath." {$fileName} |",
235361
$this->executablePath,
236362
"--host='{$server->getHost()}'",
237363
"--port='{$server->getPort()}'",
@@ -249,6 +375,10 @@ protected function buildCommandForRead(Server $server, string $query, $tables =
249375
}
250376
}
251377

378+
foreach ($settings as $key => $value) {
379+
$command[] = '--'.$key.'='.escapeshellarg($value);
380+
}
381+
252382
return [implode(' ', $command), $fileName];
253383
}
254384

src/Transport/HttpTransport.php

+17
Original file line numberDiff line numberDiff line change
@@ -429,4 +429,21 @@ protected function buildRequestUri(Server $server, array $query = []): string
429429

430430
return $uri.'?'.http_build_query($query);
431431
}
432+
433+
/**
434+
* Sends files as one block of data.
435+
*
436+
* NOT SUPPORTED VIA HTTP INTERFACE
437+
*
438+
* @param \Tinderbox\Clickhouse\Server $server
439+
* @param string $query
440+
* @param array $files
441+
* @param array $settings
442+
*
443+
* @return bool
444+
*/
445+
public function sendFilesAsOneWithQuery(Server $server, string $query, array $files, array $settings = []): bool
446+
{
447+
throw ClientException::notSupported('inserting multiple files as one via http interface');
448+
}
432449
}

0 commit comments

Comments
 (0)