Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OIDC Auth does not work on Windows Python Client #2311

Open
struie opened this issue Dec 12, 2024 · 3 comments
Open

OIDC Auth does not work on Windows Python Client #2311

struie opened this issue Dec 12, 2024 · 3 comments
Assignees

Comments

@struie
Copy link

struie commented Dec 12, 2024

if 'idp-certificate-authority-data' in provider['config']:
ca_cert = tempfile.NamedTemporaryFile(delete=True)
if PY3:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data']
).decode('utf-8')
else:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data'] + "=="
)
with open(ca_cert.name, 'w') as fh:
fh.write(cert)

OIDC Connections from a Windows Desktop fail for the following reason in the code. Some suggestions to workaround.

In python on Windows tempfile.NamedTemporaryFile fails to open for write with a permission denied error

The PermissionError with tempfile.NamedTemporaryFile on Windows is a common issue. The problem arises because, on Windows, a file cannot be opened simultaneously by multiple processes. NamedTemporaryFile keeps the file handle open, which makes it impossible to re-open it using another process.
Solution 1: Use delete=False

On Windows, when using tempfile.NamedTemporaryFile, you can use delete=False to prevent the file from being locked. Then you can manually delete it later.
Example:

import tempfile
import os

with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(b"Hello, world!")
temp_path = temp_file.name # Save the file path to use it later

print(f"Temporary file created at {temp_path}")

Reopen the file

with open(temp_path, 'r') as f:
content = f.read()
print("Content from file:", content)

Delete the file manually

os.remove(temp_path)
print(f"Temporary file {temp_path} has been deleted.")

Solution 2: Use tempfile.TemporaryDirectory

If you want to create multiple temporary files, consider using tempfile.TemporaryDirectory and creating files inside it. This way, you avoid PermissionError since each file can be opened and closed freely.
Example:

import tempfile
import os

with tempfile.TemporaryDirectory() as temp_dir:
temp_path = os.path.join(temp_dir, 'example.txt')
with open(temp_path, 'w') as temp_file:
temp_file.write("Hello, world!")

print(f"Temporary file created at {temp_path}")

# Reopen the file
with open(temp_path, 'r') as f:
    content = f.read()
    print("Content from file:", content)

print("Temporary directory and files have been cleaned up.")

Solution 3: Use tempfile.mkstemp()

If you need full control of the file descriptor, you can use tempfile.mkstemp(), which returns a file descriptor and a path. You can close the file descriptor as soon as it's created, and then reopen the file using the file path.
Example:

import tempfile
import os

fd, temp_path = tempfile.mkstemp()

Close the file descriptor immediately to avoid lock issues

os.close(fd)

Write to the file

with open(temp_path, 'w') as temp_file:
temp_file.write("Hello, world!")

print(f"Temporary file created at {temp_path}")

Read the file

with open(temp_path, 'r') as f:
content = f.read()
print("Content from file:", content)

Clean up

os.remove(temp_path)
print(f"Temporary file {temp_path} has been deleted.")

Solution 4: Use tempfile.SpooledTemporaryFile

If you don't need to persist the file on disk and just want temporary storage, use tempfile.SpooledTemporaryFile(), which will store data in memory. This avoids file system constraints and avoids permission issues.
Example:

import tempfile

with tempfile.SpooledTemporaryFile(max_size=1024, mode='w+t') as temp_file:
temp_file.write("Hello, world!")
temp_file.seek(0) # Rewind the file pointer to read the contents
content = temp_file.read()
print("Content from in-memory file:", content)

Note: SpooledTemporaryFile keeps the file in memory until its size exceeds max_size, after which it spills over to disk.

Which Solution Should You Use?

If you need a temporary file on disk, Solution 1 (delete=False) or Solution 3 (mkstemp) are best.
If you need a temporary directory to create multiple files, Solution 2 (TemporaryDirectory) is ideal.
If you only need in-memory storage, Solution 4 (SpooledTemporaryFile) is the simplest and avoids filesystem issues.

These solutions work reliably on Windows, macOS, and Linux. Let me know if you'd like an update to your existing script to use one of these methods.

@roycaihw
Copy link
Member

Thanks for flagging this and providing the thoughtful solutions! Could you clarify in what scenario this client tries to open the same tempfile by multiple processes? Could you provide a reproduce?

@struie
Copy link
Author

struie commented Jan 28, 2025

My Use Case is where I use an OpenSource IAM provider.
On my Windows Desktop I login with my AD Account and get my credentials, which I then use to update my KubeConfig file using the method prescribed by the IAM Provider. Details unimportant, just that my KUBECONFIG file now has my OIDC credentials etc in the KUBECONFIG file.

The issue I believe is not about multiple processes opening the file, but relates to Window file locks. A common issue on Windows.

 ca_cert = tempfile.NamedTemporaryFile(delete=True) 

Here, on Windows Windows creates the temorary file and keeps it open, thus when later you see:

 with open(ca_cert.name, 'w') as fh: 
     fh.write(cert) 

We encounter the error because the file is already open.

To get around this, I have monkey patched the _refresh_oidc method with the following minor changes to get this to work. I have been using it for over a month now and it seems to be working for me.

    def _refresh_oidc(self, provider):
        config = Configuration()

        try:

            if 'idp-certificate-authority-data' in provider['config']:
                # ca_cert = tempfile.NamedTemporaryFile(delete=True)
                fd, ca_cert_path = tempfile.mkstemp()
                os.close(fd)

                if PY3:
                    cert = base64.b64decode(
                        provider['config']['idp-certificate-authority-data']
                    ).decode('utf-8')
                else:
                    cert = base64.b64decode(
                        provider['config']['idp-certificate-authority-data'] + "=="
                    )

                with open(ca_cert_path, 'w') as fh:
                    fh.write(cert)

                config.ssl_ca_cert = ca_cert_path

            elif 'idp-certificate-authority' in provider['config']:
                config.ssl_ca_cert = provider['config']['idp-certificate-authority']

            else:
                config.verify_ssl = False

            client = ApiClient(configuration=config)

            response = client.request(
                method="GET",
                url="%s/.well-known/openid-configuration"
                % provider['config']['idp-issuer-url']
            )

            if response.status != 200:
                return

            response = json.loads(response.data)

            request = OAuth2Session(
                client_id=provider['config']['client-id'],
                token=provider['config']['refresh-token'],
                auto_refresh_kwargs={
                    'client_id': provider['config']['client-id'],
                    'client_secret': provider['config']['client-secret']
                },
                auto_refresh_url=response['token_endpoint']
            )

            try:
                refresh = request.refresh_token(
                    token_url=response['token_endpoint'],
                    refresh_token=provider['config']['refresh-token'],
                    auth=(provider['config']['client-id'],
                          provider['config']['client-secret']),
                    verify=config.ssl_ca_cert if config.verify_ssl else None
                )
            except oauthlib.oauth2.rfc6749.errors.InvalidClientIdError:
                return

            provider['config'].value['id-token'] = refresh['id_token']
            provider['config'].value['refresh-token'] = refresh['refresh_token']
        finally:
            if ca_cert_path and os.path.exists(ca_cert_path):
                os.remove(ca_cert_path)

@struie
Copy link
Author

struie commented Jan 28, 2025

@roycaihw I hope this helps clarify.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants