diff --git a/.github/weaviate-compose.yml b/.github/weaviate-compose.yml new file mode 100644 index 0000000000..8bbedb7b23 --- /dev/null +++ b/.github/weaviate-compose.yml @@ -0,0 +1,34 @@ +version: '3.4' +services: + weaviate: + command: + - --host + - 0.0.0.0 + - --port + - '8080' + - --scheme + - http + image: semitechnologies/weaviate:1.21.1 + ports: + - 8080:8080 + volumes: + - weaviate_data + restart: on-failure:0 + environment: + QUERY_DEFAULTS_LIMIT: 25 + AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' + PERSISTENCE_DATA_PATH: '/var/lib/weaviate' + DEFAULT_VECTORIZER_MODULE: 'none' + ENABLE_MODULES: 'text2vec-contextionary,text2vec-cohere,text2vec-huggingface,text2vec-palm,text2vec-openai,generative-openai,generative-cohere,generative-palm,ref2vec-centroid,reranker-cohere,qna-openai' + CONTEXTIONARY_URL: contextionary:9999 + CLUSTER_HOSTNAME: 'node1' + contextionary: + environment: + OCCURRENCE_WEIGHT_LINEAR_FACTOR: 0.75 + EXTENSIONS_STORAGE_MODE: weaviate + EXTENSIONS_STORAGE_ORIGIN: http://weaviate:8080 + NEIGHBOR_OCCURRENCE_IGNORE_PERCENTILE: 5 + ENABLE_COMPOUND_SPLITTING: 'false' + image: semitechnologies/contextionary:en0.16.0-v1.2.1 + ports: + - 9999:9999 diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 162b8d9d6f..50d973bad4 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -16,9 +16,12 @@ env: RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 RUNTIME__LOG_LEVEL: ERROR RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB - ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\"]" + ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]" ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]" + DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary + DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}" + jobs: get_docs_changes: uses: ./.github/workflows/get_docs_changes.yml @@ -58,6 +61,9 @@ jobs: - name: Check out uses: actions/checkout@master + - name: Start weaviate + run: docker-compose -f ".github/weaviate-compose.yml" up -d + - name: Setup Python uses: actions/setup-python@v4 with: @@ -78,9 +84,13 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli + run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate - run: poetry run pytest tests/load tests/cli name: Run tests Linux env: DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data + + - name: Stop weaviate + if: always() + run: docker-compose -f ".github/weaviate-compose.yml" down -v diff --git a/dlt/destinations/weaviate/configuration.py b/dlt/destinations/weaviate/configuration.py index 3f4c5f0a09..054e8bef25 100644 --- a/dlt/destinations/weaviate/configuration.py +++ b/dlt/destinations/weaviate/configuration.py @@ -12,8 +12,8 @@ @configspec class WeaviateCredentials(CredentialsConfiguration): - url: str - api_key: str + url: str = "http://localhost:8080" + api_key: Optional[str] additional_headers: Optional[Dict[str, str]] = None def __str__(self) -> str: @@ -32,8 +32,11 @@ class WeaviateClientConfiguration(DestinationClientDwhConfiguration): batch_workers: int = 1 batch_consistency: TWeaviateBatchConsistency = "ONE" batch_retries: int = 5 - conn_timeout: int = 10 - read_timeout: int = 3*60 + + conn_timeout: float = 10.0 + read_timeout: float = 3*60.0 + startup_period: int = 5 + dataset_separator: str = "_" credentials: WeaviateCredentials diff --git a/dlt/destinations/weaviate/weaviate_client.py b/dlt/destinations/weaviate/weaviate_client.py index 8d418680b1..eb00a788c4 100644 --- a/dlt/destinations/weaviate/weaviate_client.py +++ b/dlt/destinations/weaviate/weaviate_client.py @@ -69,6 +69,13 @@ "blob": "binary", } +NON_VECTORIZED_CLASS = { + "vectorizer": "none", + "vectorIndexConfig": { + "skip": True, + } +} + def wrap_weaviate_error(f: TFun) -> TFun: @wraps(f) @@ -243,9 +250,12 @@ def sentinel_class(self) -> str: @staticmethod def create_db_client(config: WeaviateClientConfiguration) -> weaviate.Client: + auth_client_secret: weaviate.AuthApiKey = weaviate.AuthApiKey(api_key=config.credentials.api_key) if config.credentials.api_key else None return weaviate.Client( url=config.credentials.url, - auth_client_secret=weaviate.AuthApiKey(api_key=config.credentials.api_key), + timeout_config=(config.conn_timeout, config.read_timeout), + startup_period=config.startup_period, + auth_client_secret=auth_client_secret, additional_headers=config.credentials.additional_headers, ) @@ -389,7 +399,7 @@ def is_storage_initialized(self) -> bool: def create_sentinel_class(self) -> None: """Create an empty class to indicate that the storage is initialized.""" - self.create_class({}, full_class_name=self.sentinel_class) + self.create_class(NON_VECTORIZED_CLASS, full_class_name=self.sentinel_class) def delete_sentinel_class(self) -> None: """Delete the sentinel class.""" @@ -429,7 +439,7 @@ def _execute_schema_update(self, only_tables: Iterable[str]) -> None: if len(new_columns) > 0: if exists: for column in new_columns: - prop = self._make_property_schema(column["name"], column, True) + prop = self._make_property_schema(column["name"], column) self.create_class_property(table_name, prop) else: class_schema = self.make_weaviate_class_schema(table_name) @@ -488,49 +498,48 @@ def get_schema_by_hash(self, schema_hash: str) -> Optional[StorageSchemaInfo]: def make_weaviate_class_schema(self, table_name: str) -> Dict[str, Any]: """Creates a Weaviate class schema from a table schema.""" - if table_name.startswith(self.schema._dlt_tables_prefix): - return self._make_non_vectorized_class_schema(table_name) + class_schema: Dict[str, Any] = { + "class": table_name, + "properties": self._make_properties(table_name), + } - return self._make_vectorized_class_schema(table_name) + # check if any column requires vectorization + if get_columns_names_with_prop(self.schema.get_table(table_name), VECTORIZE_HINT): # type: ignore + class_schema.update(self._vectorizer_config) + else: + class_schema.update(NON_VECTORIZED_CLASS) - def _make_properties( - self, table_name: str, is_vectorized_class: bool = True - ) -> List[Dict[str, Any]]: + return class_schema + + def _make_properties(self, table_name: str) -> List[Dict[str, Any]]: """Creates a Weaviate properties schema from a table schema. Args: table: The table name for which columns should be converted to properties - is_vectorized_class: Controls whether the `moduleConfig` should be - added to the properties schema. This is only needed for - vectorized classes. """ return [ - self._make_property_schema(column_name, column, is_vectorized_class) + self._make_property_schema(column_name, column) for column_name, column in self.schema.get_table_columns(table_name).items() ] - def _make_property_schema( - self, column_name: str, column: TColumnSchema, is_vectorized_class: bool - ) -> Dict[str, Any]: + def _make_property_schema(self, column_name: str, column: TColumnSchema) -> Dict[str, Any]: extra_kv = {} - if is_vectorized_class: - vectorizer_name = self._vectorizer_config["vectorizer"] - - # x-weaviate-vectorize: (bool) means that this field should be vectorized - if not column.get(VECTORIZE_HINT, False): - # do not vectorize - extra_kv["moduleConfig"] = { - vectorizer_name: { - "skip": True, - } + vectorizer_name = self._vectorizer_config["vectorizer"] + # x-weaviate-vectorize: (bool) means that this field should be vectorized + if not column.get(VECTORIZE_HINT, False): + # tell weaviate explicitly to not vectorize when column has no vectorize hint + extra_kv["moduleConfig"] = { + vectorizer_name: { + "skip": True, } + } - # x-weaviate-tokenization: (str) specifies the method to use - # for tokenization - if TOKENIZATION_HINT in column: - extra_kv["tokenization"] = column[TOKENIZATION_HINT] # type: ignore + # x-weaviate-tokenization: (str) specifies the method to use + # for tokenization + if TOKENIZATION_HINT in column: + extra_kv["tokenization"] = column[TOKENIZATION_HINT] # type: ignore return { "name": column_name, @@ -538,27 +547,6 @@ def _make_property_schema( **extra_kv, } - def _make_vectorized_class_schema(self, table_name: str) -> Dict[str, Any]: - properties = self._make_properties(table_name) - - return { - "class": table_name, - "properties": properties, - **self._vectorizer_config, - } - - def _make_non_vectorized_class_schema(self, table_name: str) -> Dict[str, Any]: - properties = self._make_properties(table_name, is_vectorized_class=False) - - return { - "class": table_name, - "properties": properties, - "vectorizer": "none", - "vectorIndexConfig": { - "skip": True, - }, - } - def start_file_load( self, table: TTableSchema, file_path: str, load_id: str ) -> LoadJob: diff --git a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md index 131a9b49cd..04c71ffd96 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md +++ b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md @@ -28,7 +28,16 @@ api_key = "your-weaviate-api-key" X-OpenAI-Api-Key = "your-openai-api-key" ``` -In this setup guide, we are using the [Weaviate Cloud Services](https://console.weaviate.cloud/) to get a Weaviate instance and [OpenAI API](https://platform.openai.com/) for generating embeddings through the [text2vec-openai](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules/text2vec-openai) module. You can host your own weaviate instance using docker compose, kubernetes or embedded. Refer to Weaviate's [How-to: Install](https://weaviate.io/developers/weaviate/installation) for details. +In this setup guide, we are using the [Weaviate Cloud Services](https://console.weaviate.cloud/) to get a Weaviate instance and [OpenAI API](https://platform.openai.com/) for generating embeddings through the [text2vec-openai](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules/text2vec-openai) module. + +You can host your own weaviate instance using docker compose, kubernetes or embedded. Refer to Weaviate's [How-to: Install](https://weaviate.io/developers/weaviate/installation) for details. In that case you can skip the credentials part altogether: + +```toml +[destination.weaviate.credentials.additional_headers] +X-OpenAI-Api-Key = "your-openai-api-key" +``` +The `url` will default to **http://localhost:8080** and `api_key` is not defined - which are the defaults for Weaviate container. + 3. Define the source of the data. For starters, let's load some data from a simple data structure: @@ -223,14 +232,24 @@ Reserved property names like `id` or `additional` are prefixed with underscores - `batch_size`: (int) the number of items in the batch insert request. The default is 100. - `batch_workers`: (int) the maximal number of concurrent threads to run batch import. The default is 1. -- batch_consistency: (str) the number of replica nodes in the cluster that must acknowledge a write or read request before it's considered successful. The available consistency levels include: +- `batch_consistency`: (str) the number of replica nodes in the cluster that must acknowledge a write or read request before it's considered successful. The available consistency levels include: - `ONE`: Only one replica node needs to acknowledge. - `QUORUM`: Majority of replica nodes (calculated as `replication_factor / 2 + 1`) must acknowledge. - `ALL`: All replica nodes in the cluster must send a successful response. The default is `ONE`. -- batch_retries: (int) number of retries to create a batch that failed with ReadTimeout. The default is 5. -- dataset_separator: (str) the separator to use when generating the class names in Weaviate. -- vectorizer: (str) the name of [the vectorizer](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules) to use. The default is `text2vec-openai`. +- `batch_retries`: (int) number of retries to create a batch that failed with ReadTimeout. The default is 5. +- `dataset_separator`: (str) the separator to use when generating the class names in Weaviate. +- `conn_timeout` and `read_timeout`: (float) to set timeouts (in seconds) when connecting and reading from REST API. defaults to (10.0, 180.0) +- `startup_period` (int) - how long to wait for weaviate to start +- `vectorizer`: (str) the name of [the vectorizer](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules) to use. The default is `text2vec-openai`. +- `moduleConfig`: (dict) configurations of various Weaviate modules + +Below is an example that configures the **contextionary** vectorizer. You can put this into `config.toml` - no secrets are passed. +```toml +[destination.weaviate] +vectorizer="text2vec-contextionary" +module_config={text2vec-contextionary = { vectorizeClassName = false, vectorizePropertyName = true}} +``` ### dbt support diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index 26b28366c2..a8a5bd21b2 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -283,16 +283,19 @@ def test_merge_github_nested() -> None: assert_class(p, "Issues", expected_items_count=17) -@pytest.mark.skip(reason="skip to avoid race condition with other tests") def test_empty_dataset_allowed() -> None: # weaviate dataset_name is optional so dataset name won't be autogenerated when not explicitly passed p = dlt.pipeline(destination="weaviate", full_refresh=True) + # check if we use localhost + client: WeaviateClient = p._destination_client() + if "localhost" not in client.config.credentials.url: + pytest.skip("skip to avoid race condition with other tests") + assert p.dataset_name is None - info = p.run(weaviate_adapter(["a", "b", "c"], vectorize=["value"])) + info = p.run(weaviate_adapter(["context", "created", "not a stop word"], vectorize=["value"])) # dataset in load info is empty assert info.dataset_name is None - # check weaviate client props - client: WeaviateClient = p._get_destination_client(p.default_schema) + client = p._destination_client() assert client.dataset_name is None assert client.sentinel_class == "DltSentinelClass" # also check trace @@ -300,11 +303,15 @@ def test_empty_dataset_allowed() -> None: assert_class(p, "Content", expected_items_count=3) -@pytest.mark.skip(reason="skip to avoid race condition with other tests") def test_vectorize_property_without_data() -> None: # we request to vectorize "content" but property with this name does not appear in the data # an incomplete column was created and it can't be created at destination p = dlt.pipeline(destination="weaviate", full_refresh=True) + # check if we use localhost + client: WeaviateClient = p._destination_client() + if "localhost" not in client.config.credentials.url: + pytest.skip("skip to avoid race condition with other tests") + assert p.dataset_name is None info = p.run(weaviate_adapter(["a", "b", "c"], vectorize=["content"])) # dataset in load info is empty diff --git a/tests/load/weaviate/utils.py b/tests/load/weaviate/utils.py index 20c4d93edb..0f4e42cc5c 100644 --- a/tests/load/weaviate/utils.py +++ b/tests/load/weaviate/utils.py @@ -4,6 +4,7 @@ import dlt from dlt.common.pipeline import PipelineContext from dlt.common.configuration.container import Container +from dlt.common.schema.utils import get_columns_names_with_prop from dlt.destinations.weaviate.weaviate_client import WeaviateClient from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT @@ -22,6 +23,7 @@ def assert_class( items: List[Any] = None, ) -> None: client: WeaviateClient = pipeline._destination_client() + vectorizer_name: str = client._vectorizer_config["vectorizer"] # Check if class exists schema = client.get_class_schema(class_name) @@ -35,14 +37,19 @@ def assert_class( # make sure expected columns are vectorized for column_name, column in columns.items(): prop = properties[column_name] - # text2vec-openai is the default - assert prop["moduleConfig"]["text2vec-openai"]["skip"] == ( + assert prop["moduleConfig"][vectorizer_name]["skip"] == ( not column.get(VECTORIZE_HINT, False) ) # tokenization if TOKENIZATION_HINT in column: assert prop["tokenization"] == column[TOKENIZATION_HINT] + # if there's a single vectorize hint, class must have vectorizer enabled + if get_columns_names_with_prop(pipeline.default_schema.get_table(class_name), VECTORIZE_HINT): + assert schema["vectorizer"] == vectorizer_name + else: + assert schema["vectorizer"] == "none" + # response = db_client.query.get(class_name, list(properties.keys())).do() response = client.query_class(class_name, list(properties.keys())).do() objects = response["data"]["Get"][client.make_full_name(class_name)]