Skip to content

Latest commit

 

History

History
198 lines (160 loc) · 11.6 KB

File metadata and controls

198 lines (160 loc) · 11.6 KB
page_title confluent_flink_statement Resource - terraform-provider-confluent
subcategory
description

confluent_flink_statement Resource

General Availability

-> Note: It is recommended to set lifecycle { prevent_destroy = true } on production instances to prevent accidental statement deletion. This setting rejects plans that would destroy or recreate the statement, such as attempting to change uneditable attributes. Read more about it in the Terraform docs.

Example Usage

Option #1: Manage multiple Flink Compute Pools in the same Terraform workspace

provider "confluent" {
  cloud_api_key    = var.confluent_cloud_api_key    # optionally use CONFLUENT_CLOUD_API_KEY env var
  cloud_api_secret = var.confluent_cloud_api_secret # optionally use CONFLUENT_CLOUD_API_SECRET env var
}

resource "confluent_flink_statement" "random_int_table" {
  organization {
    id = data.confluent_organization.main.id
  }
  environment {
    id = data.confluent_environment.staging.id
  }
  compute_pool {
    id = confluent_flink_compute_pool.example.id
  }
  principal {
    id = confluent_service_account.app-manager-flink.id
  }
  statement  = "CREATE TABLE random_int_table(ts TIMESTAMP_LTZ(3), random_value INT);"
  properties = {
    "sql.current-catalog"  = data.confluent_environment.example.display_name
    "sql.current-database" = data.confluent_kafka_cluster.example.display_name
  }
  # Use data.confluent_flink_region.main.rest_endpoint for Basic, Standard, public Dedicated Kafka clusters
  # For private networking use
  # data.confluent_flink_region.main.private_rest_endpoint
  # or
  # "https://flink${data.confluent_network.main.endpoint_suffix}"
  rest_endpoint = data.confluent_flink_region.main.rest_endpoint
  credentials {
    key    = confluent_api_key.env-admin-flink-api-key.id
    secret = confluent_api_key.env-admin-flink-api-key.secret
  }

  lifecycle {
    prevent_destroy = true
  }
}

Option #2: Manage a single Flink Compute Pool in the same Terraform workspace

provider "confluent" {
  organization_id       = var.organization_id            # optionally use CONFLUENT_ORGANIZATION_ID env var
  environment_id        = var.environment_id             # optionally use CONFLUENT_ENVIRONMENT_ID env var
  flink_compute_pool_id = var.flink_compute_pool_id      # optionally use FLINK_COMPUTE_POOL_ID env var
  flink_rest_endpoint   = var.flink_rest_endpoint        # optionally use FLINK_REST_ENDPOINT env var
  flink_api_key         = var.flink_api_key              # optionally use FLINK_API_KEY env var
  flink_api_secret      = var.flink_api_secret           # optionally use FLINK_API_SECRET env var
  flink_principal_id    = var.flink_principal_id         # optionally use FLINK_PRINCIPAL_ID env var
}

resource "confluent_flink_statement" "example" {
  statement  = "CREATE TABLE random_int_table(ts TIMESTAMP_LTZ(3), random_value INT);"
  properties = {
    "sql.current-catalog"  = var.confluent_environment_display_name
    "sql.current-database" = var.confluent_kafka_cluster_display_name
  }

  lifecycle {
    prevent_destroy = true
  }
}

Example of confluent_flink_statement that creates a model:

resource "confluent_flink_statement" "example" {
  statement  = "CREATE MODEL `vector_encoding` INPUT (input STRING) OUTPUT (vector ARRAY<FLOAT>) WITH( 'TASK' = 'classification','PROVIDER' = 'OPENAI','OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings','OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}');"  
  properties = {
    "sql.current-catalog"  = var.confluent_environment_display_name
    "sql.current-database" = var.confluent_kafka_cluster_display_name
  }
  properties_sensitive = {
      "sql.secrets.openaikey" : "***REDACTED***"
  }
  lifecycle {
    prevent_destroy = true
  }
}

Argument Reference

The following arguments are supported:

  • organization (Optional Configuration Block) supports the following:
    • id - (Required String) The ID of the Organization, for example, 1111aaaa-11aa-11aa-11aa-111111aaaaaa.
  • environment (Optional Configuration Block) supports the following:
    • id - (Required String) The ID of the Environment, for example, env-abc123.
  • compute_pool - (Optional Configuration Block) supports the following:
    • id - (Required String) The ID of the Flink Compute Pool, for example, lfcp-abc123.
  • principal - (Optional Configuration Block) supports the following:
    • id - (Required String) The ID of the Principal the Flink Statement runs as, for example, sa-abc123.
  • statement - (Required String) The raw SQL text statement, for example, SELECT CURRENT_TIMESTAMP;.
  • statement_name - (Optional String) The ID of the Flink Statement, for example, cfeab4fe-b62c-49bd-9e99-51cc98c77a67.
  • rest_endpoint - (Optional String) The REST endpoint of the Flink region. For example, for public networking: https://flink.us-east-1.aws.confluent.cloud. In the case of private networking, the endpoint might look like https://flink.pr1jy6.us-east-2.aws.confluent.cloud. You can construct it using either:
    • data.confluent_flink_region.main.private_rest_endpoint, or
    • https://flink${data.confluent_network.main.endpoint_suffix}
  • credentials (Optional Configuration Block) supports the following:
    • key - (Required String) The Flink API Key.
    • secret - (Required String, Sensitive) The Flink API Secret.

-> Note: A Flink API key consists of a key and a secret. Flink API keys are required to interact with Flink Statements in Confluent Cloud. Each Flink API key is valid for one specific Flink Region.

-> Note: Use Option #2 to simplify the key rotation process. When using Option #1, to rotate a Flink API key, create a new Flink API key, update the credentials block in all configuration files to use the new Flink API key, run terraform apply -target="confluent_flink_statement.example", and remove the old Flink API key. Alternatively, in case the old Flink API Key was deleted already, you might need to run terraform plan -refresh=false -target="confluent_flink_statement.example" -out=rotate-flink-api-key and terraform apply rotate-flink-api-key instead.

  • properties - (Optional Map) The custom topic settings to set:

    • name - (Required String) The setting name, for example, sql.local-time-zone.
    • value - (Required String) The setting value, for example, GMT-08:00.
  • properties_sensitive - (Optional Map) Block for sensitive statement properties:

    • name - (Required String) The setting name, for example, sql.secrets.openaikey.
    • value - (Required String) The setting value, for example, s1234.
  • stopped - (Optional Boolean) The boolean flag is used to indicate the statement's running status and to control whether the Flink Statement should be stopped or resumed. Defaults to false. Update it to true to stop the statement. Subsequently update it to false to resume the statement.

!> Note: To stop a running statement, no other argument can be updated except stopped.

!> Note: When resuming a stopped statement, you can update principal.id and/or compute_pool.id in addition to stopped attribute. This enables the statement to run under a different principal (with the appropriate role assignment) or a different Flink compute pool (as long as it is in the same Flink region as the original).

!> Note: Currently, only 3 Flink statements support the resume feature, namely: CREATE TABLE AS, INSERT INTO, and EXECUTE STATEMENT SET.

!> Warning: Use Option #2 to avoid exposing sensitive credentials value in a state file. When using Option #1, Terraform doesn't encrypt the sensitive credentials value of the confluent_flink_statement resource, so you must keep your state file secure to avoid exposing it. Refer to the Terraform documentation to learn more about securing your state file.

-> Note: When using OAuth to authenticate a Flink statement, if the intended principal.id is a service account instead of an Identity Pool, make sure the Identity Pool has an Assigner role binding on the service account. Otherwise, you may encounter a 403 Forbidden error. For example:

resource "confluent_role_binding" "identity-pool-assigner" {
  principal   = "User:pool-abc123"
  role_name   = "Assigner"
  crn_pattern = "${data.confluent_organization.main.resource_name}/service-account=sa-def456"
}

Attributes Reference

In addition to the preceding arguments, the following attributes are exported:

  • id - (Required String) The ID of the Flink statement, in the format <Environment ID>/<Flink Compute Pool ID>/<Flink Statement name>, for example, env-abc123/lfcp-xyz123/cfeab4fe-b62c-49bd-9e99-51cc98c77a67.
  • latest_offsets - (Optional String) The last Kafka offsets that a statement has processed. Represented by a mapping from Kafka topic to a string representation of partitions mapped to offsets. For example,
"latest_offsets": {
  "topic-1": "partition:0,offset:100;partition:1,offset:200",
  "topic-2": "partition:0,offset:50"
}
  • latest_offsets_timestamp - (Optional String) The date and time at which the Kafka topic offsets were added to the statement status. It is represented in RFC3339 format and is in UTC. For example, 2023-03-31T00:00:00-00:00.

!> Note: The values for the latest_offsets and latest_offsets_timestamp attributes are populated only for stopped statements.

-> Note: To start a statement from the last offsets of a previous statement, you can automatically reuse the offsets of a previous statement as documented in the flink-carry-over-offset-between-statements example, or inject latest_offsets as a SQL hint.

Import

You can import a Flink statement by using the Flink Statement name, for example:

# Option #1: Manage multiple Flink Compute Pools in the same Terraform workspace
$ export IMPORT_CONFLUENT_ORGANIZATION_ID="<organization_id>"
$ export IMPORT_CONFLUENT_ENVIRONMENT_ID="<environment_id>"
$ export IMPORT_FLINK_COMPUTE_POOL_ID="<flink_compute_pool_id>"
$ export IMPORT_FLINK_API_KEY="<flink_api_key>"
$ export IMPORT_FLINK_API_SECRET="<flink_api_secret>"
$ export IMPORT_FLINK_REST_ENDPOINT="<flink_rest_endpoint>"
$ export IMPORT_FLINK_PRINCIPAL_ID="<flink_rest_endpoint>"
$ terraform import confluent_flink_statement.example cfeab4fe-b62c-49bd-9e99-51cc98c77a67

# Option #2: Manage a single Flink Compute Pool in the same Terraform workspace
$ terraform import confluent_flink_statement.example cfeab4fe-b62c-49bd-9e99-51cc98c77a67

!> Warning: Do not forget to delete terminal command history afterwards for security purposes.

Getting Started

The following end-to-end example might help to get started with Flink Statements: