diff --git a/.github/workflows/docker-repo.yml b/.github/workflows/docker-repo.yml new file mode 100644 index 00000000..d221a12e --- /dev/null +++ b/.github/workflows/docker-repo.yml @@ -0,0 +1,49 @@ +name: Get ESDB Docker repo +on: + workflow_call: + inputs: + runtime_env: + description: The runtime environment we want to run like release or staging + type: string + default: release + + outputs: + docker_repo: + description: ESDB docker repository + value: ${{ jobs.provide_docker.outputs.docker_repo }} + + docker_container: + description: ESDB docker container + value: ${{ jobs.provide_docker.outputs.docker_container }} + +jobs: + provide_docker: + runs-on: ubuntu-latest + outputs: + docker_repo: ${{ steps.set_docker.outputs.docker_repo }} + docker_container: ${{ steps.set_docker.outputs.docker_container }} + steps: + - name: Set ESDB docker repo + id: set_docker + run: | + case ${{ inputs.runtime_env }} in + "release") + echo "docker_repo=eventstore-ce" >> $GITHUB_OUTPUT + echo "docker_container=eventstoredb-ce" >> $GITHUB_OUTPUT + ;; + + "staging") + echo "docker_repo=eventstore-staging-ce" >> $GITHUB_OUTPUT + echo "docker_container=eventstoredb-oss" >> $GITHUB_OUTPUT + ;; + + "enterprise") + echo "docker_repo=eventstore-ee" >> $GITHUB_OUTPUT + echo "docker_container=eventstoredb-commercial" >> $GITHUB_OUTPUT + ;; + + *) + echo "docker_repo=eventstore-ce" >> $GITHUB_OUTPUT + echo "docker_container=eventstoredb-ce" >> $GITHUB_OUTPUT + ;; + esac diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 263efb83..cc9161fc 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -27,7 +27,9 @@ jobs: run: make misc tests: - needs: build + needs: + - go-version + - build name: Tests strategy: diff --git a/.github/workflows/test-dispatch.yml b/.github/workflows/test-dispatch.yml index c65ee2a8..2ceac38f 100644 --- a/.github/workflows/test-dispatch.yml +++ b/.github/workflows/test-dispatch.yml @@ -3,8 +3,13 @@ name: "Dispatch" on: workflow_dispatch: inputs: + runtime_env: + description: The runtime environment. release, staging or enterprise + type: string + default: release + version: - description: "Version tag" + description: Docker version tag required: true type: string @@ -18,4 +23,5 @@ jobs: uses: ./.github/workflows/tests.yml with: esdb_version: ${{ inputs.version }} + runtime_env: ${{ inputs.runtime_env }} go_version: ${{ needs.go-version.outputs.go_version }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a51aa3a4..93a916a5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,6 +3,10 @@ name: tests workflow on: workflow_call: inputs: + runtime_env: + type: string + default: release + esdb_version: required: true type: string @@ -12,13 +16,19 @@ on: type: string jobs: + provide_docker: + uses: ./.github/workflows/docker-repo.yml + with: + runtime_env: ${{ inputs.runtime_env }} + single_node: + needs: provide_docker name: Single node strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions, Expectations, Projections] + test: [Streams, PersistentSubscriptions, Projections] runs-on: ubuntu-latest steps: @@ -33,11 +43,14 @@ jobs: - name: Run Go Tests run: make ci CI_TARGET=Test${{ matrix.test }} env: - EVENTSTORE_DOCKER_TAG_ENV: ${{ inputs.esdb_version }} + ESDB_DOCKER_REPO: ${{ needs.provide_docker.outputs.docker_repo }} + ESDB_DOCKER_CONTAINER: ${{ needs.provide_docker.outputs.docker_container }} + ESDB_DOCKER_CONTAINER_VERSION: ${{ inputs.esdb_version }} EVENTSTORE_INSECURE: true secure: name: Secure + needs: provide_docker strategy: fail-fast: false @@ -58,11 +71,14 @@ jobs: run: make ci CI_TARGET=Test${{ matrix.test }} env: - EVENTSTORE_DOCKER_TAG_ENV: ${{ inputs.esdb_version }} + ESDB_DOCKER_REPO: ${{ needs.provide_docker.outputs.docker_repo }} + ESDB_DOCKER_CONTAINER: ${{ needs.provide_docker.outputs.docker_container }} + ESDB_DOCKER_CONTAINER_VERSION: ${{ inputs.esdb_version }} EVENTSTORE_INSECURE: false cluster: name: Cluster + needs: provide_docker strategy: fail-fast: false @@ -80,7 +96,9 @@ jobs: run: | docker compose -f cluster-docker-compose.yml up -d env: - EVENTSTORE_DOCKER_TAG_ENV: ${{ inputs.esdb_version }} + ESDB_DOCKER_REPO: ${{ needs.provide_docker.outputs.docker_repo }} + ESDB_DOCKER_CONTAINER: ${{ needs.provide_docker.outputs.docker_container }} + ESDB_DOCKER_CONTAINER_VERSION: ${{ inputs.esdb_version }} - name: Run Go Tests run: make ci CI_TARGET=Test${{ matrix.test }} diff --git a/Makefile b/Makefile index 406eafff..9bad9869 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ endif .PHONY: singleNode singleNode: ## Run tests against a single node. - @EVENTSTORE_INSECURE=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestExpectations|TestProjections' + @EVENTSTORE_INSECURE=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestProjections' .PHONY: secureNode secureNode: ## Run tests against a secure node. diff --git a/cluster-docker-compose.yml b/cluster-docker-compose.yml index bba68629..6e31e809 100644 --- a/cluster-docker-compose.yml +++ b/cluster-docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.5" - services: volumes-provisioner: image: hasnat/volumes-provisioner @@ -25,8 +23,8 @@ services: depends_on: - volumes-provisioner - esdb-node1: - image: docker.eventstore.com/eventstore-ce/eventstoredb-ce:${EVENTSTORE_DOCKER_TAG_ENV:-latest} + esdb-node1: &template + image: docker.eventstore.com/${ESDB_DOCKER_REPO:-eventstore-ce}/${ESDB_DOCKER_CONTAINER:-eventstoredb-ce}:${ESDB_DOCKER_CONTAINER_VERSION:-latest} env_file: - shared.env environment: @@ -47,9 +45,7 @@ services: - cert-gen esdb-node2: - image: docker.eventstore.com/eventstore-ce/eventstoredb-ce:${EVENTSTORE_DOCKER_TAG_ENV:-latest} - env_file: - - shared.env + <<: *template environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.13:2113 - EVENTSTORE_INT_IP=172.30.240.12 @@ -61,16 +57,9 @@ services: networks: clusternetwork: ipv4_address: 172.30.240.12 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen esdb-node3: - image: docker.eventstore.com/eventstore-ce/eventstoredb-ce:${EVENTSTORE_DOCKER_TAG_ENV:-latest} - env_file: - - shared.env + <<: *template environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.12:2113 - EVENTSTORE_INT_IP=172.30.240.13 @@ -82,11 +71,6 @@ services: networks: clusternetwork: ipv4_address: 172.30.240.13 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen networks: clusternetwork: diff --git a/esdb/append_test.go b/esdb/append_test.go index 0ee4865d..24ad782e 100644 --- a/esdb/append_test.go +++ b/esdb/append_test.go @@ -24,6 +24,16 @@ func createTestEvent() esdb.EventData { return event } +func createTestEvents(count int) []esdb.EventData { + var events []esdb.EventData + + for i := 0; i < count; i++ { + events = append(events, createTestEvent()) + } + + return events +} + func collectStreamEvents(stream *esdb.ReadStream) ([]*esdb.ResolvedEvent, error) { events := []*esdb.ResolvedEvent{} diff --git a/esdb/client.go b/esdb/client.go index 1b7aaaef..b9c3eb5e 100644 --- a/esdb/client.go +++ b/esdb/client.go @@ -32,6 +32,11 @@ func NewClient(configuration *Configuration) (*Client, error) { }, nil } +// Config Returns client's current configuration. +func (client *Client) Config() *Configuration { + return client.config +} + // Close Closes a connection and cleans all its allocated resources. func (client *Client) Close() error { client.grpcClient.close() diff --git a/esdb/client_test.go b/esdb/client_test.go index a98f333f..47b3fc19 100644 --- a/esdb/client_test.go +++ b/esdb/client_test.go @@ -15,16 +15,6 @@ func TestStreams(t *testing.T) { defer emptyClient.Close() } - populatedContainer, populatedClient := CreatePopulatedDatabase(t) - - if populatedContainer != nil { - defer populatedContainer.Close() - } - - if populatedClient != nil { - defer populatedClient.Close() - } - isCluster := GetEnvOrDefault("CLUSTER", "false") == "true" isInsecure := GetEnvOrDefault("EVENTSTORE_INSECURE", "true") == "true" @@ -33,8 +23,8 @@ func TestStreams(t *testing.T) { } AppendTests(t, emptyContainer, emptyClient) - ReadStreamTests(t, emptyClient, populatedClient) - SubscriptionTests(t, emptyClient, populatedClient) + ReadStreamTests(t, emptyClient) + SubscriptionTests(t, emptyClient) DeleteTests(t, emptyClient) ConnectionTests(t, emptyContainer) @@ -59,18 +49,8 @@ func TestPersistentSubscriptions(t *testing.T) { defer emptyClient.Close() } - populatedContainer, populatedClient := CreatePopulatedDatabase(t) - - if populatedContainer != nil { - defer populatedContainer.Close() - } - - if populatedClient != nil { - defer populatedClient.Close() - } - PersistentSubReadTests(t, emptyClient) - PersistentSubTests(t, emptyClient, populatedClient) + PersistentSubTests(t, emptyClient) } func TestProjections(t *testing.T) { @@ -87,20 +67,6 @@ func TestProjections(t *testing.T) { ProjectionTests(t, emptyClient) } -func TestExpectations(t *testing.T) { - populatedContainer, populatedClient := CreatePopulatedDatabase(t) - - if populatedContainer != nil { - defer populatedContainer.Close() - } - - if populatedClient != nil { - defer populatedClient.Close() - } - - ReadAllTests(t, populatedClient) -} - func TestMisc(t *testing.T) { ConnectionStringTests(t) TestPositionParsing(t) diff --git a/esdb/containers_test.go b/esdb/containers_test.go index 4e67e4aa..140b9dab 100644 --- a/esdb/containers_test.go +++ b/esdb/containers_test.go @@ -19,9 +19,10 @@ import ( ) const ( - EVENTSTORE_DOCKER_REPOSITORY_ENV = "EVENTSTORE_DOCKER_REPOSITORY" - EVENTSTORE_DOCKER_TAG_ENV = "EVENTSTORE_DOCKER_TAG_ENV" - EVENTSTORE_DOCKER_PORT_ENV = "EVENTSTORE_DOCKER_PORT" + ESDB_DOCKER_REPO_ENV = "ESDB_DOCKER_REPO" + ESDB_DOCKER_CONTAINER_ENV = "ESDB_DOCKER_CONTAINER" + ESDB_DOCKER_CONTAINER_VERSION_ENV = "ESDB_DOCKER_CONTAINER_VERSION" + EVENTSTORE_DOCKER_PORT_ENV = "EVENTSTORE_DOCKER_PORT" ) var ( @@ -42,14 +43,19 @@ type EventStoreDockerConfig struct { } const ( - DEFAULT_EVENTSTORE_DOCKER_REPOSITORY = "docker.eventstore.com/eventstore-utils/testdata" - DEFAULT_EVENTSTORE_DOCKER_TAG = "latest" - DEFAULT_EVENTSTORE_DOCKER_PORT = "2113" + DEFAULT_ESDB_DOCKER_REPO = "eventstore-ce" + DEFAULT_ESDB_DOCKER_CONTAINER = "eventstoredb-ce" + DEFAULT_ESDB_DOCKER_CONTAINER_VERSION = "latest" + DEFAULT_EVENTSTORE_DOCKER_PORT = "2113" ) +func fullDockerRepo(repo string, container string) string { + return fmt.Sprintf("docker.eventstore.com/%s/%s", repo, container) +} + var defaultEventStoreDockerConfig = EventStoreDockerConfig{ - Repository: DEFAULT_EVENTSTORE_DOCKER_REPOSITORY, - Tag: DEFAULT_EVENTSTORE_DOCKER_TAG, + Repository: fullDockerRepo(DEFAULT_ESDB_DOCKER_REPO, DEFAULT_ESDB_DOCKER_CONTAINER), + Tag: DEFAULT_ESDB_DOCKER_CONTAINER_VERSION, Port: DEFAULT_EVENTSTORE_DOCKER_PORT, } @@ -61,54 +67,17 @@ func GetEnvOrDefault(key, defaultValue string) string { } func readEnvironmentVariables(config EventStoreDockerConfig) EventStoreDockerConfig { - config.Repository = GetEnvOrDefault(EVENTSTORE_DOCKER_REPOSITORY_ENV, config.Repository) - config.Tag = GetEnvOrDefault(EVENTSTORE_DOCKER_TAG_ENV, config.Tag) + repo := GetEnvOrDefault(ESDB_DOCKER_REPO_ENV, DEFAULT_ESDB_DOCKER_REPO) + container := GetEnvOrDefault(ESDB_DOCKER_CONTAINER_ENV, DEFAULT_ESDB_DOCKER_CONTAINER) + + config.Repository = fullDockerRepo(repo, container) + config.Tag = GetEnvOrDefault(ESDB_DOCKER_CONTAINER_VERSION_ENV, config.Tag) config.Port = GetEnvOrDefault(EVENTSTORE_DOCKER_PORT_ENV, config.Port) fmt.Println(spew.Sdump(config)) return config } -type ESDBVersion struct { - Maj int - Min int - Patch int -} - -type VersionPredicateFn = func(ESDBVersion) bool - -func IsESDB_Version(predicate VersionPredicateFn) bool { - value, exists := os.LookupEnv(EVENTSTORE_DOCKER_TAG_ENV) - if !exists || value == "ci" { - return false - } - - parts := strings.Split(value, "-") - versionNumbers := strings.Split(parts[0], ".") - - version := ESDBVersion{ - Maj: mustConvertToInt(versionNumbers[0]), - Min: mustConvertToInt(versionNumbers[1]), - Patch: mustConvertToInt(versionNumbers[2]), - } - - return predicate(version) -} - -func mustConvertToInt(s string) int { - val, err := strconv.Atoi(s) - if err != nil { - panic(err) - } - return val -} - -func IsESDBVersion20() bool { - return IsESDB_Version(func(version ESDBVersion) bool { - return version.Maj < 21 - }) -} - func getContainerRequest() (*EventStoreDockerConfig, *testcontainers.ContainerRequest, error) { config := readEnvironmentVariables(defaultEventStoreDockerConfig) @@ -157,6 +126,7 @@ func getContainerRequest() (*EventStoreDockerConfig, *testcontainers.ContainerRe Files: files, WaitingFor: wait. ForHTTP("/health/live"). + WithPort(nat.Port(config.Port)). WithTLS(!insecure). WithStartupTimeout(1 * time.Minute). WithAllowInsecure(true). @@ -190,8 +160,12 @@ func getDatabase(t *testing.T, config EventStoreDockerConfig, req testcontainers t.Fatalf("error when looking up container mapped port %s: %v", config.Port, err) } + t.Logf("[debug] container got port %s mapped to %s", config.Port, port) + endpoint := fmt.Sprintf("localhost:%s", port.Port()) + t.Logf("[debug] endpoint is localhost:%s", port.Port()) + if !container.IsRunning() { t.Fatalf("failed to get a running container after many attempts") } @@ -254,51 +228,28 @@ func GetClient(t *testing.T, container *Container) *esdb.Client { } func CreateEmptyDatabase(t *testing.T) (*Container, *esdb.Client) { - return createDatabase(t, false) -} - -func CreatePopulatedDatabase(t *testing.T) (*Container, *esdb.Client) { - return createDatabase(t, true) -} - -func createDatabase(t *testing.T, populated bool) (*Container, *esdb.Client) { isInsecure := GetEnvOrDefault("EVENTSTORE_INSECURE", "true") == "true" - var label string - - if populated { - label = "populated" - } else { - label = "empty" - } - var container *Container var client *esdb.Client if GetEnvOrDefault("CLUSTER", "false") == "true" { - // When run on the cluster configuration we don't run the pre-populated database, so we have no use for a client - // either. - if !populated { - client = GetClient(t, nil) - } + client = GetClient(t, nil) } else { if isInsecure { - t.Logf("[debug] starting %s insecure database container...", label) + t.Logf("[debug] starting insecure database container...") } else { - t.Logf("[debug] starting %s database container...", label) + t.Logf("[debug] starting database container...") } config, req, err := getContainerRequest() + fmt.Println(spew.Sdump(req)) + if err != nil { t.Fatalf("error when constructing testcontainer request: %v", err) } - if populated { - req.Env["EVENTSTORE_DB"] = "/data/integration-tests" - req.Env["EVENTSTORE_MEM_DB"] = "false" - } - container = getDatabase(t, *config, *req) client = GetClient(t, container) @@ -313,11 +264,15 @@ func createDatabase(t *testing.T, populated bool) (*Container, *esdb.Client) { } func createTestClient(conn string, container *Container, t *testing.T) *esdb.Client { + t.Logf("[debug] connection string => %s", conn) + config, err := esdb.ParseConnectionString(conn) if err != nil { t.Fatalf("Unexpected configuration error: %s", err.Error()) } + fmt.Println(spew.Sdump(config)) + client, err := esdb.NewClient(config) if err != nil { t.Fatalf("Unexpected failure setting up test connection: %s", err.Error()) @@ -328,7 +283,7 @@ func createTestClient(conn string, container *Container, t *testing.T) *esdb.Cli func WaitForAdminToBeAvailable(t *testing.T, db *esdb.Client) { for count := 0; count < 50; count++ { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Logf("[debug] checking if admin user is available...%v/50", count) stream, err := db.ReadStream(ctx, "$users", esdb.ReadStreamOptions{}, 1) @@ -352,6 +307,14 @@ func WaitForAdminToBeAvailable(t *testing.T, db *esdb.Client) { if err != nil { if esdbError, ok := esdb.FromError(err); !ok { + // If we are in insecure mode, the $users stream is not available. + if db.Config().DisableTLS && esdbError.Code() == esdb.ErrorCodeResourceNotFound { + t.Log("[debug] admin is available!") + cancel() + stream.Close() + return + } + if esdbError.Code() == esdb.ErrorCodeResourceNotFound || esdbError.Code() == esdb.ErrorCodeUnauthenticated || esdbError.Code() == esdb.ErrorCodeDeadlineExceeded || diff --git a/esdb/persistent_subscription_read_test.go b/esdb/persistent_subscription_read_test.go index 15c02b1b..8a815c7a 100644 --- a/esdb/persistent_subscription_read_test.go +++ b/esdb/persistent_subscription_read_test.go @@ -499,12 +499,6 @@ func persistentSubscriptionToAll_Read(clientInstance *esdb.Client) TestCall { }, ) - if err, ok := esdb.FromError(err); !ok { - if err.Code() == esdb.ErrorCodeUnsupportedFeature && IsESDBVersion20() { - t.Skip() - } - } - require.NoError(t, err) readConnectionClient, err := clientInstance.SubscribeToPersistentSubscriptionToAll( diff --git a/esdb/persistent_subscriptions_test.go b/esdb/persistent_subscriptions_test.go index 60f0ad7b..8883d106 100644 --- a/esdb/persistent_subscriptions_test.go +++ b/esdb/persistent_subscriptions_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -func PersistentSubTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBClient *esdb.Client) { +func PersistentSubTests(t *testing.T, emptyDBClient *esdb.Client) { t.Run("PersistentSubTests", func(t *testing.T) { t.Run("createPersistentStreamSubscription", createPersistentStreamSubscription(emptyDBClient)) t.Run("createPersistentStreamSubscription_MessageTimeoutZero", createPersistentStreamSubscription_MessageTimeoutZero(emptyDBClient)) @@ -23,7 +23,6 @@ func PersistentSubTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBCli t.Run("updatePersistentStreamSubscription_ErrIfSubscriptionDoesNotExist", updatePersistentStreamSubscription_ErrIfSubscriptionDoesNotExist(emptyDBClient)) t.Run("deletePersistentStreamSubscription", deletePersistentStreamSubscription(emptyDBClient)) t.Run("deletePersistentSubscription_ErrIfSubscriptionDoesNotExist", deletePersistentSubscription_ErrIfSubscriptionDoesNotExist(emptyDBClient)) - t.Run("testPersistentSubscriptionClosing", testPersistentSubscriptionClosing(populatedDBClient)) t.Run("persistentAllCreate", persistentAllCreate(emptyDBClient)) t.Run("persistentAllUpdate", persistentAllUpdate(emptyDBClient)) t.Run("persistentAllDelete", persistentAllDelete(emptyDBClient)) @@ -254,66 +253,6 @@ func pushEventsToStream(t *testing.T, require.NoError(t, err) } -func testPersistentSubscriptionClosing(db *esdb.Client) TestCall { - return func(t *testing.T) { - if db == nil { - t.Skip() - } - - streamID := "dataset20M-0" - groupName := "Group 1" - - err := db.CreatePersistentSubscription(context.Background(), streamID, groupName, esdb.PersistentStreamSubscriptionOptions{ - StartFrom: esdb.Start{}, - }) - - require.NoError(t, err) - - var receivedEvents sync.WaitGroup - var droppedEvent sync.WaitGroup - - subscription, err := db.SubscribeToPersistentSubscription( - context.Background(), streamID, groupName, esdb.SubscribeToPersistentSubscriptionOptions{ - BufferSize: 2, - }) - - require.NoError(t, err) - - go func() { - current := 1 - - for { - subEvent := subscription.Recv() - - if subEvent.EventAppeared != nil { - if current <= 10 { - receivedEvents.Done() - current++ - } - - subscription.Ack(subEvent.EventAppeared.Event) - - continue - } - - if subEvent.SubscriptionDropped != nil { - droppedEvent.Done() - break - } - } - }() - - require.NoError(t, err) - receivedEvents.Add(10) - droppedEvent.Add(1) - timedOut := waitWithTimeout(&receivedEvents, time.Duration(5)*time.Second) - require.False(t, timedOut, "Timed out waiting for initial set of events") - subscription.Close() - timedOut = waitWithTimeout(&droppedEvent, time.Duration(5)*time.Second) - require.False(t, timedOut, "Timed out waiting for dropped event") - } -} - func persistentAllCreate(client *esdb.Client) TestCall { return func(t *testing.T) { groupName := NAME_GENERATOR.Generate() @@ -324,12 +263,6 @@ func persistentAllCreate(client *esdb.Client) TestCall { esdb.PersistentAllSubscriptionOptions{}, ) - if err, ok := esdb.FromError(err); !ok { - if err.Code() == esdb.ErrorCodeUnsupportedFeature && IsESDBVersion20() { - t.Skip() - } - } - require.NoError(t, err) } } @@ -344,12 +277,6 @@ func persistentAllUpdate(client *esdb.Client) TestCall { esdb.PersistentAllSubscriptionOptions{}, ) - if err, ok := esdb.FromError(err); !ok { - if err.Code() == esdb.ErrorCodeUnsupportedFeature && IsESDBVersion20() { - t.Skip() - } - } - require.NoError(t, err) setts := esdb.SubscriptionSettingsDefault() @@ -373,12 +300,6 @@ func persistentAllDelete(client *esdb.Client) TestCall { esdb.PersistentAllSubscriptionOptions{}, ) - if err, ok := esdb.FromError(err); !ok { - if err.Code() == esdb.ErrorCodeUnsupportedFeature && IsESDBVersion20() { - t.Skip() - } - } - require.NoError(t, err) err = client.DeletePersistentSubscriptionToAll(context.Background(), groupName, esdb.DeletePersistentSubscriptionOptions{}) diff --git a/esdb/read_all_test.go b/esdb/read_all_test.go deleted file mode 100644 index d0cd8b34..00000000 --- a/esdb/read_all_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package esdb_test - -import ( - "context" - "encoding/json" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/EventStore/EventStore-Client-Go/v4/esdb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func ReadAllTests(t *testing.T, populatedDBClient *esdb.Client) { - t.Run("ReadAllTests", func(t *testing.T) { - t.Run("readAllEventsForwardsFromZeroPosition(", readAllEventsForwardsFromZeroPosition(populatedDBClient)) - t.Run("readAllEventsForwardsFromNonZeroPosition", readAllEventsForwardsFromNonZeroPosition(populatedDBClient)) - t.Run("readAllEventsBackwardsFromZeroPosition", readAllEventsBackwardsFromZeroPosition(populatedDBClient)) - t.Run("readAllEventsBackwardsFromNonZeroPosition", readAllEventsBackwardsFromNonZeroPosition(populatedDBClient)) - t.Run("readAllEventsWithCredentialsOverride", readAllEventsWithCredentialOverride(populatedDBClient)) - }) -} - -func readAllEventsForwardsFromZeroPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - eventsContent, err := os.ReadFile("../resources/test/all-e0-e10.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - opts := esdb.ReadAllOptions{ - Direction: esdb.Forwards, - From: esdb.Start{}, - ResolveLinkTos: true, - } - stream, err := db.ReadAll(context, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i].OriginalEvent().CreatedDate.Unix()) - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i].OriginalEvent().Position.Prepare) - assert.Equal(t, testEvents[i].Event.ContentType, events[i].OriginalEvent().ContentType) - } - } -} - -func readAllEventsForwardsFromNonZeroPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - eventsContent, err := ioutil.ReadFile("../resources/test/all-c1788-p1788.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - opts := esdb.ReadAllOptions{ - From: esdb.Position{Commit: 1_788, Prepare: 1_788}, - ResolveLinkTos: true, - } - - stream, err := db.ReadAll(context, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i].OriginalEvent().CreatedDate.Unix()) - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i].OriginalEvent().Position.Prepare) - assert.Equal(t, testEvents[i].Event.ContentType, events[i].OriginalEvent().ContentType) - } - } -} - -func readAllEventsBackwardsFromZeroPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - eventsContent, err := os.ReadFile("../resources/test/all-back-e0-e10.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - opts := esdb.ReadAllOptions{ - From: esdb.End{}, - Direction: esdb.Backwards, - ResolveLinkTos: true, - } - - // We read 30 more events in case the DB had pushed more config related events before the test begins. - stream, err := db.ReadAll(context, opts, numberOfEvents+30) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - offset := 0 - - // We remove potential events that were added by the server on startup, causing the expected event list to be - // misaligned. - for i := 0; i < numberOfEventsToRead; i++ { - if events[i].OriginalEvent().CreatedDate.Year() == 2020 { - break - } - - offset += 1 - } - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i+offset].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i+offset].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i+offset].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i+offset].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i+offset].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i+offset].OriginalEvent().CreatedDate.Unix()) - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i+offset].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i+offset].OriginalEvent().Position.Prepare) - assert.Equal(t, testEvents[i].Event.ContentType, events[i+offset].OriginalEvent().ContentType) - } - } -} - -func readAllEventsBackwardsFromNonZeroPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - eventsContent, err := ioutil.ReadFile("../resources/test/all-back-c3386-p3386.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - opts := esdb.ReadAllOptions{ - From: esdb.Position{Commit: 3_386, Prepare: 3_386}, - Direction: esdb.Backwards, - ResolveLinkTos: true, - } - - stream, err := db.ReadAll(context, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - assert.Equal(t, numberOfEvents, uint64(len(events)), "Expected the correct number of messages to be returned") - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i].OriginalEvent().CreatedDate.Unix()) - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i].OriginalEvent().Position.Prepare) - assert.Equal(t, testEvents[i].Event.ContentType, events[i].OriginalEvent().ContentType) - } - } -} - -func readAllEventsWithCredentialOverride(db *esdb.Client) TestCall { - return func(t *testing.T) { - eventsContent, err := ioutil.ReadFile("../resources/test/all-back-c3386-p3386.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - opts := esdb.ReadAllOptions{ - Authenticated: &esdb.Credentials{ - Login: "admin", - Password: "changeit", - }, - From: esdb.Position{Commit: 3_386, Prepare: 3_386}, - Direction: esdb.Forwards, - ResolveLinkTos: false, - } - - stream, err := db.ReadAll(context, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - // collect all events to see if no error occurs - _, err = collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - } -} diff --git a/esdb/read_stream_test.go b/esdb/read_stream_test.go index 5f8d9024..0b5a3ac9 100644 --- a/esdb/read_stream_test.go +++ b/esdb/read_stream_test.go @@ -2,12 +2,9 @@ package esdb_test import ( "context" - "encoding/json" "errors" "github.com/google/uuid" "io" - "io/ioutil" - "math" "testing" "time" @@ -41,10 +38,8 @@ type Event struct { Created Created `json:"created"` } -func ReadStreamTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBClient *esdb.Client) { +func ReadStreamTests(t *testing.T, emptyDBClient *esdb.Client) { t.Run("ReadStreamTests", func(t *testing.T) { - t.Run("readStreamEventsForwardsFromZeroPosition", readStreamEventsForwardsFromZeroPosition(populatedDBClient)) - t.Run("readStreamEventsBackwardsFromEndPosition", readStreamEventsBackwardsFromEndPosition(populatedDBClient)) t.Run("readStreamReturnsEOFAfterCompletion", readStreamReturnsEOFAfterCompletion(emptyDBClient)) t.Run("readStreamNotFound", readStreamNotFound(emptyDBClient)) t.Run("readStreamWithMaxAge", readStreamWithMaxAge(emptyDBClient)) @@ -52,144 +47,6 @@ func ReadStreamTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBClient }) } -func readStreamEventsForwardsFromZeroPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - if db == nil { - t.Skip() - } - - eventsContent, err := ioutil.ReadFile("../resources/test/dataset20M-1800-e0-e10.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - streamId := "dataset20M-1800" - - opts := esdb.ReadStreamOptions{ - Direction: esdb.Forwards, - ResolveLinkTos: true, - } - - stream, err := db.ReadStream(context, streamId, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - assert.Equal(t, numberOfEvents, uint64(len(events)), "Expected the correct number of messages to be returned") - - serverVersion, err := db.GetServerVersion() - if err != nil { - t.Fatalf("Failed to retrieve server version %+v", err) - } - - isAtLeast22_6 := serverVersion.Major == 22 && serverVersion.Minor >= 6 || serverVersion.Major > 22 - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i].OriginalEvent().CreatedDate.Unix()) - if isAtLeast22_6 { - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i].OriginalEvent().Position.Prepare) - } else { - assert.Equal(t, uint64(math.MaxUint64), events[i].OriginalEvent().Position.Commit) - assert.Equal(t, uint64(math.MaxUint64), events[i].OriginalEvent().Position.Prepare) - } - - assert.Equal(t, testEvents[i].Event.ContentType, events[i].OriginalEvent().ContentType) - } - } -} - -func readStreamEventsBackwardsFromEndPosition(db *esdb.Client) TestCall { - return func(t *testing.T) { - if db == nil { - t.Skip() - } - - eventsContent, err := ioutil.ReadFile("../resources/test/dataset20M-1800-e1999-e1990.json") - require.NoError(t, err) - - var testEvents []TestEvent - err = json.Unmarshal(eventsContent, &testEvents) - - require.NoError(t, err) - - context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) - defer cancel() - - numberOfEventsToRead := 10 - numberOfEvents := uint64(numberOfEventsToRead) - - streamId := "dataset20M-1800" - opts := esdb.ReadStreamOptions{ - Direction: esdb.Backwards, - From: esdb.End{}, - ResolveLinkTos: true, - } - - stream, err := db.ReadStream(context, streamId, opts, numberOfEvents) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - defer stream.Close() - - events, err := collectStreamEvents(stream) - - if err != nil { - t.Fatalf("Unexpected failure %+v", err) - } - - assert.Equal(t, numberOfEvents, uint64(len(events)), "Expected the correct number of messages to be returned") - - serverVersion, err := db.GetServerVersion() - if err != nil { - t.Fatalf("Failed to retrieve server version %+v", err) - } - - isAtLeast22_6 := serverVersion.Major == 22 && serverVersion.Minor >= 6 || serverVersion.Major > 22 - - for i := 0; i < numberOfEventsToRead; i++ { - assert.Equal(t, testEvents[i].Event.EventID, events[i].OriginalEvent().EventID) - assert.Equal(t, testEvents[i].Event.EventType, events[i].OriginalEvent().EventType) - assert.Equal(t, testEvents[i].Event.StreamID, events[i].OriginalEvent().StreamID) - assert.Equal(t, testEvents[i].Event.StreamRevision.Value, events[i].OriginalEvent().EventNumber) - assert.Equal(t, testEvents[i].Event.Created.Nanos, events[i].OriginalEvent().CreatedDate.Nanosecond()) - assert.Equal(t, testEvents[i].Event.Created.Seconds, events[i].OriginalEvent().CreatedDate.Unix()) - if isAtLeast22_6 { - assert.Equal(t, testEvents[i].Event.Position.Commit, events[i].OriginalEvent().Position.Commit) - assert.Equal(t, testEvents[i].Event.Position.Prepare, events[i].OriginalEvent().Position.Prepare) - } else { - assert.Equal(t, uint64(math.MaxUint64), events[i].OriginalEvent().Position.Commit) - assert.Equal(t, uint64(math.MaxUint64), events[i].OriginalEvent().Position.Prepare) - } - assert.Equal(t, testEvents[i].Event.ContentType, events[i].OriginalEvent().ContentType) - } - } -} - func readStreamReturnsEOFAfterCompletion(db *esdb.Client) TestCall { return func(t *testing.T) { proposedEvents := []esdb.EventData{} diff --git a/esdb/subscriptions_test.go b/esdb/subscriptions_test.go index f330a37f..74660823 100644 --- a/esdb/subscriptions_test.go +++ b/esdb/subscriptions_test.go @@ -2,9 +2,6 @@ package esdb_test import ( "context" - "encoding/json" - "github.com/google/uuid" - "io/ioutil" "strings" "sync" "testing" @@ -14,20 +11,18 @@ import ( "github.com/stretchr/testify/require" ) -func SubscriptionTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBClient *esdb.Client) { +func SubscriptionTests(t *testing.T, emptyDBClient *esdb.Client) { t.Run("SubscriptionTests", func(t *testing.T) { - t.Run("streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents", streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(populatedDBClient)) - t.Run("allSubscriptionWithFilterDeliversCorrectEvents", allSubscriptionWithFilterDeliversCorrectEvents(populatedDBClient)) + t.Run("streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents", streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(emptyDBClient)) t.Run("subscriptionAllFilter", subscriptionAllFilter(emptyDBClient)) - t.Run("connectionClosing", connectionClosing(populatedDBClient)) - t.Run("subscriptionAllWithCredentialsOverride", subscriptionAllWithCredentialsOverride(populatedDBClient)) - t.Run("subscriptionToStreamCaughtUpMessage", subscriptionToStreamCaughtUpMessage(populatedDBClient)) + t.Run("connectionClosing", connectionClosing(emptyDBClient)) + t.Run("subscriptionToStreamCaughtUpMessage", subscriptionToStreamCaughtUpMessage(emptyDBClient)) }) } func subscriptionToStreamCaughtUpMessage(db *esdb.Client) TestCall { const minSupportedVersion = 23 - const expectedEventCount = 6_000 + const expectedEventCount = 16 const testTimeout = 1 * time.Minute return func(t *testing.T) { @@ -45,7 +40,12 @@ func subscriptionToStreamCaughtUpMessage(db *esdb.Client) TestCall { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - streamID := "dataset20M-0" + streamID := NAME_GENERATOR.Generate() + testEvents := createTestEvents(expectedEventCount) + + _, err = db.AppendToStream(context.Background(), streamID, esdb.AppendToStreamOptions{}, testEvents...) + require.NoError(t, err) + subscription, err := db.SubscribeToStream(ctx, streamID, esdb.SubscribeToStreamOptions{From: esdb.Start{}}) require.NoError(t, err) defer subscription.Close() @@ -91,47 +91,39 @@ func subscriptionToStreamCaughtUpMessage(db *esdb.Client) TestCall { func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *esdb.Client) TestCall { return func(t *testing.T) { - if db == nil { - t.Skip() - } + streamID := NAME_GENERATOR.Generate() + eventCount := 64 + testEvents := createTestEvents(eventCount) - streamID := "dataset20M-0" - testEvent := createTestEvent() - testEvent.EventID = uuid.MustParse("84c8e36c-4e64-11ea-8b59-b7f658acfc9f") + writeResult, err := db.AppendToStream(context.Background(), streamID, esdb.AppendToStreamOptions{}, testEvents...) + require.NoError(t, err) var receivedEvents sync.WaitGroup var appendedEvents sync.WaitGroup - subscription, err := db.SubscribeToStream(context.Background(), "dataset20M-0", esdb.SubscribeToStreamOptions{ + subscription, err := db.SubscribeToStream(context.Background(), streamID, esdb.SubscribeToStreamOptions{ From: esdb.Start{}, }) require.NoError(t, err) defer subscription.Close() - receivedEvents.Add(6_000) + receivedEvents.Add(eventCount) appendedEvents.Add(1) go func() { - current := 0 - for { + for i := 0; i < eventCount; { subEvent := subscription.Recv() - if subEvent.EventAppeared != nil { - current++ - if current <= 6_000 { - receivedEvents.Done() - continue - } - - event := subEvent.EventAppeared - require.Equal(t, testEvent.EventID, event.OriginalEvent().EventID) - require.Equal(t, uint64(6_000), event.OriginalEvent().EventNumber) - require.Equal(t, streamID, event.OriginalEvent().StreamID) - require.Equal(t, testEvent.Data, event.OriginalEvent().Data) - require.Equal(t, testEvent.Metadata, event.OriginalEvent().UserMetadata) + if subEvent.SubscriptionDropped != nil { break } + + if subEvent.EventAppeared != nil { + i++ + receivedEvents.Done() + } } + appendedEvents.Done() }() @@ -140,11 +132,10 @@ func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *esdb. // Write a new event opts2 := esdb.AppendToStreamOptions{ - ExpectedRevision: esdb.Revision(5_999), + ExpectedRevision: esdb.Revision(writeResult.NextExpectedVersion), } - writeResult, err := db.AppendToStream(context.Background(), streamID, opts2, testEvent) + _, err = db.AppendToStream(context.Background(), streamID, opts2, createTestEvent()) require.NoError(t, err) - require.Equal(t, uint64(6_000), writeResult.NextExpectedVersion) // Assert event was forwarded to the subscription timedOut = waitWithTimeout(&appendedEvents, time.Duration(5)*time.Second) @@ -157,66 +148,6 @@ type Position struct { Commit uint64 `json:"commit"` } -func allSubscriptionWithFilterDeliversCorrectEvents(db *esdb.Client) TestCall { - return func(t *testing.T) { - if db == nil { - t.Skip() - } - - positionsContent, err := ioutil.ReadFile("../resources/test/all-positions-filtered-stream-194-e0-e30.json") - require.NoError(t, err) - versionsContent, err := ioutil.ReadFile("../resources/test/all-versions-filtered-stream-194-e0-e30.json") - require.NoError(t, err) - var positions []Position - var versions []uint64 - err = json.Unmarshal(positionsContent, &positions) - require.NoError(t, err) - err = json.Unmarshal(versionsContent, &versions) - require.NoError(t, err) - - var receivedEvents sync.WaitGroup - receivedEvents.Add(1) - - subscription, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{ - From: esdb.Start{}, - Filter: &esdb.SubscriptionFilter{ - Type: esdb.EventFilterType, - Prefixes: []string{"eventType-194"}, - }, - }) - - defer subscription.Close() - - go func() { - current := 0 - for current <= len(versions)-1 { - subEvent := subscription.Recv() - - if subEvent.SubscriptionDropped != nil { - break - } - - if subEvent.EventAppeared != nil { - event := subEvent.EventAppeared - - require.Equal(t, versions[current], event.OriginalEvent().EventNumber) - require.Equal(t, positions[current].Commit, event.OriginalEvent().Position.Commit) - require.Equal(t, positions[current].Prepare, event.OriginalEvent().Position.Prepare) - current++ - } - } - - if current > len(versions)-1 { - receivedEvents.Done() - } - }() - - require.NoError(t, err) - timedOut := waitWithTimeout(&receivedEvents, time.Duration(5)*time.Second) - require.False(t, timedOut, "Timed out while waiting for events via the subscription") - } -} - func subscriptionAllFilter(db *esdb.Client) TestCall { return func(t *testing.T) { sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{From: esdb.Start{}, Filter: esdb.ExcludeSystemEventsFilter()}) @@ -240,6 +171,10 @@ func subscriptionAllFilter(db *esdb.Client) TestCall { completed.Done() break } + + if event.SubscriptionDropped != nil { + break + } } }() @@ -250,18 +185,21 @@ func subscriptionAllFilter(db *esdb.Client) TestCall { func connectionClosing(db *esdb.Client) TestCall { return func(t *testing.T) { - if db == nil { - t.Skip() - } + streamID := NAME_GENERATOR.Generate() + eventCount := 128 + testEvents := createTestEvents(eventCount) + + _, err := db.AppendToStream(context.Background(), streamID, esdb.AppendToStreamOptions{}, testEvents...) + require.NoError(t, err) var droppedEvent sync.WaitGroup - subscription, err := db.SubscribeToStream(context.Background(), "dataset20M-0", esdb.SubscribeToStreamOptions{ + subscription, err := db.SubscribeToStream(context.Background(), streamID, esdb.SubscribeToStreamOptions{ From: esdb.Start{}, }) go func() { - current := 1 + current := 0 for { subEvent := subscription.Recv() @@ -291,28 +229,6 @@ func connectionClosing(db *esdb.Client) TestCall { } } -func subscriptionAllWithCredentialsOverride(db *esdb.Client) TestCall { - return func(t *testing.T) { - if db == nil { - t.Skip() - } - - opts := esdb.SubscribeToAllOptions{ - Authenticated: &esdb.Credentials{ - Login: "admin", - Password: "changeit", - }, - From: esdb.Start{}, - Filter: esdb.ExcludeSystemEventsFilter(), - } - _, err := db.SubscribeToAll(context.Background(), opts) - - if err != nil { - t.Error(err) - } - } -} - func waitWithTimeout(wg *sync.WaitGroup, duration time.Duration) bool { channel := make(chan struct{}) go func() {