diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc
index 286151c5251..637fc1f5df9 100644
--- a/changelogs/head.asciidoc
+++ b/changelogs/head.asciidoc
@@ -6,6 +6,7 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits]
 [float]
 ==== Breaking Changes
 - The unsupported apm_data_stream_migration ingest pipeline has been removed {pull}12102[12102].
+- "publish_ready" is always false in the "GET /" response until events are received by apm-server {pull}12150[12150]
 
 [float]
 ==== Bug fixes
@@ -23,3 +24,4 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits]
 - Add support for returning partial success response in OTLP input {pull}11883[11883]
 - Setting event timestamp from OTel observed timestamp when needed {pull}11935[11935]
 - Field mappings have been added for various formerly unindexed fields {pull}12102[12102]
+- We now assert that index templates are installed by attempting to create data streams {pull}12150[12150]
diff --git a/internal/beater/beater.go b/internal/beater/beater.go
index aee736e3fa1..330753510dd 100644
--- a/internal/beater/beater.go
+++ b/internal/beater/beater.go
@@ -25,6 +25,7 @@ import (
 	"net/http"
 	"os"
 	"runtime"
+	"sync"
 	"time"
 
 	"github.com/dustin/go-humanize"
@@ -42,11 +43,9 @@ import (
 	_ "google.golang.org/grpc/encoding/gzip"
 
 	"github.com/elastic/beats/v7/libbeat/beat"
-	"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
 	"github.com/elastic/beats/v7/libbeat/instrumentation"
 	"github.com/elastic/beats/v7/libbeat/licenser"
 	"github.com/elastic/beats/v7/libbeat/outputs"
-	esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
 	"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
 	"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
 	agentconfig "github.com/elastic/elastic-agent-libs/config"
@@ -347,7 +346,13 @@ func (s *Runner) Run(ctx context.Context) error {
 	// any events to Elasticsearch before the integration is ready.
 	publishReady := make(chan struct{})
 	drain := make(chan struct{})
+	startWaitReady := make(chan struct{})
+	var waitReadyOnce sync.Once
 	g.Go(func() error {
+		select {
+		case <-ctx.Done():
+		case <-startWaitReady:
+		}
 		if err := s.waitReady(ctx, kibanaClient, tracer); err != nil {
 			// One or more preconditions failed; drop events.
 			close(drain)
@@ -358,24 +363,25 @@ func (s *Runner) Run(ctx context.Context) error {
 		close(publishReady)
 		return nil
 	})
-	callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
+	prePublish := func(ctx context.Context) error {
+		waitReadyOnce.Do(func() {
+			close(startWaitReady)
+		})
 		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-drain:
+			return errServerShuttingDown
 		case <-publishReady:
-			return nil
-		default:
 		}
-		return errors.New("not ready for publishing events")
-	})
-	if err != nil {
-		return err
+		return nil
 	}
-	defer esoutput.DeregisterConnectCallback(callbackUUID)
 	newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
 		httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
 		if err != nil {
 			return nil, err
 		}
-		transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
+		transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish}
 		return elasticsearch.NewClientParams(elasticsearch.ClientParams{
 			Config:    cfg,
 			Transport: transport,
@@ -432,7 +438,7 @@ func (s *Runner) Run(ctx context.Context) error {
 	// Create the BatchProcessor chain that is used to process all events,
 	// including the metrics aggregated by APM Server.
 	finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
-		tracer, newElasticsearchClient, memLimitGB,
+		tracer, newElasticsearchClient, memLimitGB, prePublish,
 	)
 	if err != nil {
 		return err
@@ -647,7 +653,9 @@ func (s *Runner) waitReady(
 			return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
 		}
 		preconditions = append(preconditions, func(ctx context.Context) error {
-			return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
+			return checkIndexTemplatesInstalled(
+				ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger,
+			)
 		})
 	}
 
@@ -672,12 +680,13 @@ func (s *Runner) newFinalBatchProcessor(
 	tracer *apm.Tracer,
 	newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
 	memLimit float64,
+	prePublish func(context.Context) error,
 ) (modelpb.BatchProcessor, func(context.Context) error, error) {
 
 	monitoring.Default.Remove("libbeat")
 	libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
 	if s.elasticsearchOutputConfig == nil {
-		return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry)
+		return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry)
 	}
 
 	stateRegistry := monitoring.GetNamespace("state").GetRegistry()
@@ -829,6 +838,7 @@ func docappenderConfig(
 
 func (s *Runner) newLibbeatFinalBatchProcessor(
 	tracer *apm.Tracer,
+	prePublish func(context.Context) error,
 	libbeatMonitoringRegistry *monitoring.Registry,
 ) (modelpb.BatchProcessor, func(context.Context) error, error) {
 	// When the publisher stops cleanly it will close its pipeline client,
@@ -889,7 +899,13 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
 		}
 		return acker.Wait(ctx)
 	}
-	return publisher, stop, nil
+	processor := modelprocessor.Chained{
+		modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
+			return prePublish(ctx)
+		}),
+		publisher,
+	}
+	return processor, stop, nil
 }
 
 const sourcemapIndex = ".apm-source-map"
diff --git a/internal/beater/beatertest/server.go b/internal/beater/beatertest/server.go
index 9812e3cb6e7..63109ac25ae 100644
--- a/internal/beater/beatertest/server.go
+++ b/internal/beater/beatertest/server.go
@@ -108,7 +108,8 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server {
 	require.NoError(t, err)
 	if !outputConfig.Output.IsSet() {
 		err = cfg.Merge(map[string]any{
-			"output.null": map[string]any{},
+			"output.null":     map[string]any{},
+			"queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"},
 		})
 		require.NoError(t, err)
 	}
diff --git a/internal/beater/checkintegration.go b/internal/beater/checkintegration.go
index f0fe62ebc59..a86edb4b120 100644
--- a/internal/beater/checkintegration.go
+++ b/internal/beater/checkintegration.go
@@ -24,23 +24,26 @@ import (
 	"fmt"
 	"io"
 	"net/http"
+	"strings"
 
 	"github.com/pkg/errors"
-	"golang.org/x/sync/errgroup"
 
 	"github.com/elastic/elastic-agent-libs/logp"
 
 	"github.com/elastic/apm-server/internal/elasticsearch"
 	"github.com/elastic/apm-server/internal/kibana"
-	"github.com/elastic/go-elasticsearch/v8/esapi"
+	"github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream"
+	"github.com/elastic/go-elasticsearch/v8/typedapi/types"
 )
 
-// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana
-// and/or Elasticsearch, returning nil if and only if it is installed.
-func checkIntegrationInstalled(
+// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the
+// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch,
+// returning nil if and only if it is installed.
+func checkIndexTemplatesInstalled(
 	ctx context.Context,
 	kibanaClient *kibana.Client,
 	esClient *elasticsearch.Client,
+	namespace string,
 	logger *logp.Logger,
 ) (err error) {
 	defer func() {
@@ -53,37 +56,36 @@ func checkIntegrationInstalled(
 			}
 		}
 	}()
+	if esClient != nil {
+		installed, err := checkCreateDataStream(ctx, esClient, namespace)
+		if err != nil {
+			return fmt.Errorf("error checking Elasticsearch index template setup: %w", err)
+		}
+		if !installed {
+			return errors.New("index templates not installed")
+		}
+		return nil
+	}
 	if kibanaClient != nil {
-		installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
+		installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger)
 		if err != nil {
 			// We only return the Kibana error if we have no Elasticsearch client,
 			// as we may not have sufficient privileges to query the Fleet API.
 			if esClient == nil {
 				return fmt.Errorf("error querying Kibana for integration package status: %w", err)
 			}
-		} else if !installed {
+		}
+		if !installed {
 			// We were able to query Kibana, but the package is not yet installed.
-			// We should continue querying the package status via Kibana, as it is
-			// more authoritative than checking for index template installation.
 			return errors.New("integration package not yet installed")
 		}
-		// Fall through and query Elasticsearch (if we have a client). Kibana may prematurely
-		// report packages as installed: https://github.com/elastic/kibana/issues/108649
-	}
-	if esClient != nil {
-		installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
-		if err != nil {
-			return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err)
-		} else if !installed {
-			return errors.New("integration index templates not installed")
-		}
 	}
 	return nil
 }
 
 // checkIntegrationInstalledKibana checks if the APM integration package
 // is installed by querying Kibana.
-func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
+func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
 	resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
 	if err != nil {
 		return false, err
@@ -106,41 +108,22 @@ func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.C
 	return result.Response.Status == "installed", nil
 }
 
-func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) {
-	// TODO(axw) generate the list of expected index templates.
-	templates := []string{
-		"traces-apm",
-		"traces-apm.sampled",
-		"metrics-apm.app",
-		"metrics-apm.internal",
-		"logs-apm.error",
-	}
-	for _, intervals := range []string{"1m", "10m", "60m"} {
-		for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} {
-			templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals))
-		}
-	}
-	// IndicesGetIndexTemplateRequest accepts a slice of template names,
-	// but the REST API expects just one index template name. Query them
-	// in parallel.
-	g, ctx := errgroup.WithContext(ctx)
-	for _, template := range templates {
-		template := template // copy for closure
-		g.Go(func() error {
-			req := esapi.IndicesGetIndexTemplateRequest{Name: template}
-			resp, err := req.Do(ctx, esClient)
-			if err != nil {
-				return err
+// checkCreateDataStream attempts to create a traces-apm-<namespace> data stream,
+// returning an error if it could not be created. This will fail if there is no
+// index template matching the pattern.
+func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) {
+	if _, err := createdatastream.New(esClient).Name("traces-apm-" + namespace).Do(ctx); err != nil {
+		var esError *types.ElasticsearchError
+		if errors.As(err, &esError) {
+			cause := esError.ErrorCause
+			if cause.Type == "resource_already_exists_exception" {
+				return true, nil
 			}
-			defer resp.Body.Close()
-
-			if resp.IsError() {
-				body, _ := io.ReadAll(resp.Body)
-				return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
+			if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") {
+				return false, nil
 			}
-			return nil
-		})
+		}
+		return false, err
 	}
-	err := g.Wait()
-	return err == nil, err
+	return true, nil
 }
diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go
index 1249a41c4af..98429cd9247 100644
--- a/internal/beater/server_test.go
+++ b/internal/beater/server_test.go
@@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) {
 
 func TestServerWaitForIntegrationKibana(t *testing.T) {
 	var requests int64
-	requestCh := make(chan struct{})
+	requestCh := make(chan struct{}, 3)
 	mux := http.NewServeMux()
 	mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
 		w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
@@ -363,6 +363,14 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
 		},
 	})))
 
+	// Send some events to the server. They should be accepted and enqueued.
+	req := makeTransactionRequest(t, srv.URL)
+	req.Header.Add("Content-Type", "application/x-ndjson")
+	resp, err := srv.Client.Do(req)
+	assert.NoError(t, err)
+	assert.Equal(t, http.StatusAccepted, resp.StatusCode)
+	resp.Body.Close()
+
 	timeout := time.After(10 * time.Second)
 	for i := 0; i < 3; i++ {
 		select {
@@ -387,8 +395,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
 
 func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
 	var mu sync.Mutex
-	var tracesRequests int
-	tracesRequestsCh := make(chan int)
+	var createDataStreamRequests int
+	createDataStreamRequestsCh := make(chan int)
 	bulkCh := make(chan struct{}, 1)
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -397,17 +405,24 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
 		// elasticsearch client to send bulk requests.
 		fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
 	})
-	mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
+	mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) {
 		mu.Lock()
 		defer mu.Unlock()
-		template := path.Base(r.URL.Path)
-		if template == "traces-apm" {
-			tracesRequests++
-			if tracesRequests == 1 {
-				w.WriteHeader(404)
-			}
-			tracesRequestsCh <- tracesRequests
+		name := path.Base(r.URL.Path)
+		if name != "traces-apm-testing" {
+			panic("unexpected data stream name: " + name)
+		}
+		createDataStreamRequests++
+		switch createDataStreamRequests {
+		case 1:
+			w.WriteHeader(500)
+		case 2:
+			w.WriteHeader(400)
+			w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`))
+		case 3:
+			w.Write([]byte(`{"acknowledged":true}`))
 		}
+		createDataStreamRequestsCh <- createDataStreamRequests
 	})
 	mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
 		select {
@@ -422,6 +437,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
 		"apm-server": map[string]interface{}{
 			"wait_ready_interval":               "100ms",
 			"data_streams.wait_for_integration": true,
+			"data_streams.namespace":            "testing",
 		},
 		"output.elasticsearch": map[string]interface{}{
 			"hosts":       []string{elasticsearchServer.URL},
@@ -456,8 +472,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
 	var done bool
 	for !done {
 		select {
-		case n := <-tracesRequestsCh:
-			done = n == 2
+		case n := <-createDataStreamRequestsCh:
+			done = n == 3
 		case <-timeout:
 			t.Fatal("timed out waiting for request")
 		}
@@ -471,7 +487,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
 	}
 
 	logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
-	assert.Len(t, logs.All(), 1, "couldn't find remediation message logs")
+	assert.Len(t, logs.All(), 2, "couldn't find remediation message logs")
 
 	// Healthcheck should now report that the server is publish-ready.
 	resp, err = srv.Client.Get(srv.URL + api.RootPath)
@@ -490,7 +506,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) {
 		// elasticsearch client to send bulk requests.
 		fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
 	})
-	mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
+	mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) {
 		w.WriteHeader(404)
 	})
 	mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
diff --git a/internal/beater/waitready.go b/internal/beater/waitready.go
index 7bfaa6b04fd..50811a4d688 100644
--- a/internal/beater/waitready.go
+++ b/internal/beater/waitready.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"errors"
 	"net/http"
+	"path"
 	"time"
 
 	"go.elastic.co/apm/v2"
@@ -71,27 +72,21 @@ func waitReady(
 }
 
 // waitReadyRoundTripper wraps a *net/http.Transport, ensuring the server's
-// indexing preconditions have been satisfied by waiting for "ready" channel
-// to be signalled, prior to allowing any requests through.
-//
-// This is used to prevent elasticsearch clients from proceeding with requests
-// until the APM integration is installed to ensure we don't index any documents
-// prior to the data stream index templates being ready.
+// indexing preconditions have been satisfied prior to allowing any indexing
+// requests through. This is used to ensure we don't index any documents prior
+// to the data stream index templates being ready.
 type waitReadyRoundTripper struct {
 	*http.Transport
-	ready <-chan struct{}
-	drain <-chan struct{}
+	onBulk func(context.Context) error
 }
 
 var errServerShuttingDown = errors.New("server shutting down")
 
 func (c *waitReadyRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
-	select {
-	case <-c.drain:
-		return nil, errServerShuttingDown
-	case <-c.ready:
-	case <-r.Context().Done():
-		return nil, r.Context().Err()
+	if path.Base(r.URL.Path) == "_bulk" {
+		if err := c.onBulk(r.Context()); err != nil {
+			return nil, err
+		}
 	}
 	return c.Transport.RoundTrip(r)
 }