I am able to use pysftp to upload file to an sftp location with a userid and private_key:
with pysftp.Connection(host=job_args["sftp_host"], username=job_args["sftp_username"],
private_key=job_args["sftp_pem"], log=os.path.join(dir_name, 'logs/pysftp.log')) as sftp:
sftp.put(f"{job_args['gzip_target_path']}part-00000-45e353c1-dce3-48a6-ba20-adc4c341a1b8-c000.csv.gz",
f"/TO_MRKTNG_TEST/BRWSNG_BHVR_{datetime.today().strftime('%Y%m%d')}.TXT.gz") # upload file to public/ on remote
sftp.close()
But I am getting an error when I try the same with spark-sftp:
result_df.write.format("com.springml.spark.sftp"). \
option("host", job_args["sftp_host"]). \
option("username", job_args["sftp_username"]). \
option("pem", job_args["sftp_pem"]). \
option("fileType", "csv"). \
option("delimiter", ","). \
save(f"/TO_MRKTNG_TEST/BRWSNG_BHVR_{datetime.today().strftime('%Y%m%d')}.TXT.gz")
Here is the error:
answer = 'xro102', gateway_client = <py4j.java_gateway.GatewayClient object at 0x11793b710>, target_id = 'o101', name = 'save'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o101.save.
E : com.jcraft.jsch.JSchException: invalid privatekey: [B@35c88b66
E at com.jcraft.jsch.KeyPair.load(KeyPair.java:664)
E at com.jcraft.jsch.KeyPair.load(KeyPair.java:561)
E at com.jcraft.jsch.IdentityFile.newInstance(IdentityFile.java:40)
E at com.jcraft.jsch.JSch.addIdentity(JSch.java:407)
E at com.jcraft.jsch.JSch.addIdentity(JSch.java:367)
E at com.springml.sftp.client.SFTPClient.createSFTPChannel(SFTPClient.java:266)
E at com.springml.sftp.client.SFTPClient.copyToFTP(SFTPClient.java:102)
E at com.springml.spark.sftp.DefaultSource.upload(DefaultSource.scala:160)
E at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:126)
E at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
E at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
E at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
E at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
E at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
E at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
E at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
E at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
E at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
E at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
E at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.lang.Thread.run(Thread.java:748)
../../../.virtualenvs/venv/regionsBank-aamFileExport-spark-etl/lib/python3.7/site-packages/py4j/protocol.py:328: Py4JJavaError
=============================================================================================== 1 failed, 1 deselected in 23.42 seconds ===============================================================================================
(regionsBank-aamFileExport-spark-etl) kavenkats-MacBook-Pro:regionsBank-aamFileExport-spark-etl kavenkat$
Am I missing something?
Thanks
I am able to use pysftp to upload file to an sftp location with a userid and private_key:
But I am getting an error when I try the same with spark-sftp:
Here is the error:
Am I missing something?
Thanks