Skip to content

Commit 39e03ea

Browse files
authored
Work around #615 (#667)
* Work around #615, and add support for building with the Static Linux SDK * Factor out threading abstraction for BatchWorkers and use normal Thread subclass on Apple platforms
1 parent 804d1d7 commit 39e03ea

File tree

10 files changed

+128
-39
lines changed

10 files changed

+128
-39
lines changed

[email protected]

+3-18
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ let package = Package(
3535
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.2"),
3636
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
3737
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.1.1"),
38+
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0")
3839
],
3940
targets: [
4041
.target(name: "OpenTelemetryApi",
4142
dependencies: []),
4243
.target(name: "OpenTelemetrySdk",
43-
dependencies: ["OpenTelemetryApi"].withAtomicsIfNeeded()),
44+
dependencies: ["OpenTelemetryApi",
45+
.product(name: "Atomics", package: "swift-atomics", condition: .when(platforms: [.linux]))]),
4446
.target(name: "OpenTelemetryConcurrency",
4547
dependencies: ["OpenTelemetryApi"]),
4648
.target(name: "OpenTelemetryTestUtils",
@@ -142,25 +144,8 @@ let package = Package(
142144
]
143145
).addPlatformSpecific()
144146

145-
extension [Target.Dependency] {
146-
func withAtomicsIfNeeded() -> [Target.Dependency] {
147-
#if canImport(Darwin)
148-
return self
149-
#else
150-
var dependencies = self
151-
dependencies.append(.product(name: "Atomics", package: "swift-atomics"))
152-
return dependencies
153-
#endif
154-
}
155-
}
156-
157147
extension Package {
158148
func addPlatformSpecific() -> Self {
159-
#if !canImport(Darwin)
160-
dependencies.append(
161-
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0"))
162-
)
163-
#endif
164149
#if canImport(ObjectiveC)
165150
dependencies.append(
166151
.package(url: "https://github.com/undefinedlabs/opentracing-objc", from: "0.5.2")

Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333

3434
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
3535
import Darwin
36-
#else
36+
#elseif canImport(Glibc)
3737
import Glibc
38+
#elseif canImport(Musl)
39+
import Musl
40+
#else
41+
#error("Unsupported platform")
3842
#endif
3943

4044
/// A threading lock based on `libpthread` instead of `libdispatch`.

Sources/Importers/OpenTracingShim/Locks.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333

3434
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
3535
import Darwin
36+
#elseif canImport(Glibc)
37+
import Glibc
38+
#elseif canImport(Musl)
39+
import Musl
3640
#else
37-
import Glibc
41+
#error("Unsupported platform")
3842
#endif
3943

4044
/// A threading lock based on `libpthread` instead of `libdispatch`.

Sources/Importers/SwiftMetricsShim/Locks.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333

3434
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
3535
import Darwin
36-
#else
36+
#elseif canImport(Glibc)
3737
import Glibc
38+
#elseif canImport(Musl)
39+
import Musl
40+
#else
41+
#error("Unsupported platform")
3842
#endif
3943

4044
/// A threading lock based on `libpthread` instead of `libdispatch`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import Foundation
2+
3+
#if (swift(>=6) && os(Linux)) || OPENTELEMETRY_SWIFT_LINUX_COMPAT
4+
// https://github.com/open-telemetry/opentelemetry-swift/issues/615 prevents Linux builds from succeeding due to a regression in Swift 6 when subclassing Thread. We can work around this by using a block based Thread.
5+
class WorkerThread {
6+
var thread: Thread!
7+
8+
var isCancelled: Bool {
9+
self.thread.isCancelled
10+
}
11+
12+
init() {
13+
self.thread = Thread(block: { [weak self] in
14+
self?.main()
15+
})
16+
}
17+
18+
func main() {}
19+
20+
func start() {
21+
self.thread.start()
22+
}
23+
24+
func cancel() {
25+
self.thread.cancel()
26+
}
27+
}
28+
#else
29+
// Builds using a Swift older than 5 or on a non-Linux OS should be able to use the normal Thread subclass
30+
class WorkerThread: Thread {}
31+
#endif

Sources/OpenTelemetrySdk/Internal/Locks.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333

3434
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
3535
import Darwin
36-
#else
36+
#elseif canImport(Glibc)
3737
import Glibc
38+
#elseif canImport(Musl)
39+
import Musl
40+
#else
41+
#error("Unsupported platform")
3842
#endif
3943

4044
/// A threading lock based on `libpthread` instead of `libdispatch`.

Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// Copyright The OpenTelemetry Authors
33
// SPDX-License-Identifier: Apache-2.0
4-
//
4+
//
55

66
import Foundation
77
import OpenTelemetryApi
@@ -36,7 +36,7 @@ public class BatchLogRecordProcessor: LogRecordProcessor {
3636
}
3737
}
3838

39-
private class BatchWorker: Thread {
39+
private class BatchWorker: WorkerThread {
4040
let logRecordExporter: LogRecordExporter
4141
let scheduleDelay: TimeInterval
4242
let maxQueueSize: Int
@@ -69,7 +69,7 @@ private class BatchWorker: Thread {
6969

7070
func emit(logRecord: ReadableLogRecord) {
7171
cond.lock()
72-
defer { cond.unlock()}
72+
defer { cond.unlock() }
7373
if logRecordList.count == maxQueueSize {
7474
// TODO: record a counter for dropped logs
7575
return
@@ -84,18 +84,18 @@ private class BatchWorker: Thread {
8484

8585
override func main() {
8686
repeat {
87-
autoreleasepool {
88-
var logRecordsCopy: [ReadableLogRecord]
89-
cond.lock()
90-
if logRecordList.count < maxExportBatchSize {
91-
repeat {
92-
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
93-
} while logRecordList.isEmpty && !self.isCancelled
94-
}
95-
logRecordsCopy = logRecordList
96-
logRecordList.removeAll()
97-
cond.unlock()
98-
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
87+
autoreleasepool {
88+
var logRecordsCopy: [ReadableLogRecord]
89+
cond.lock()
90+
if logRecordList.count < maxExportBatchSize {
91+
repeat {
92+
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
93+
} while logRecordList.isEmpty && !self.isCancelled
94+
}
95+
logRecordsCopy = logRecordList
96+
logRecordList.removeAll()
97+
cond.unlock()
98+
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
9999
}
100100
} while !self.isCancelled
101101
}

Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public struct BatchSpanProcessor: SpanProcessor {
7171
/// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export
7272
/// the data.
7373
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
74-
private class BatchWorker: Thread {
74+
private class BatchWorker: WorkerThread {
7575
let spanExporter: SpanExporter
7676
let meterProvider: StableMeterProvider?
7777
let scheduleDelay: TimeInterval

Tests/OpenTelemetrySdkTests/Logs/BatchLogRecordProcessorTests.swift

+25
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,31 @@ class BatchLogRecordProcessorTests : XCTestCase {
9898
let exported = waitingExporter.waitForExport()
9999
XCTAssertEqual(exported?.count, 9)
100100
}
101+
102+
func testShutdownNoMemoryCycle() {
103+
// A weak reference to the exporter that will be retained by the BatchWorker
104+
weak var exporter: WaitingLogRecordExporter?
105+
do {
106+
let waitingExporter = WaitingLogRecordExporter(numberToWaitFor: 2)
107+
exporter = waitingExporter
108+
let processors = [BatchLogRecordProcessor(logRecordExporter: waitingExporter,scheduleDelay: maxScheduleDelay)]
109+
let loggerProvider = LoggerProviderBuilder().with(processors: processors).build()
110+
let logger = loggerProvider.get(instrumentationScopeName: "BatchLogRecordProcessorTest")
111+
logger.logRecordBuilder().emit()
112+
logger.logRecordBuilder().emit()
113+
let exported = waitingExporter.waitForExport()
114+
XCTAssertEqual(exported?.count, 2)
115+
116+
for processor in processors {
117+
_ = processor.shutdown()
118+
}
119+
}
120+
121+
// After the BatchWorker is shutdown, it will continue waiting for the condition variable to be signaled up to the maxScheduleDelay. Until that point the exporter won't be deallocated.
122+
sleep(UInt32(ceil(maxScheduleDelay + 1)))
123+
// Interestingly, this will always succeed on macOS even if you intentionally create a strong reference cycle between the BatchWorker and the Thread's closure. I assume either calling cancel or the thread exiting releases the closure which breaks the cycle. This is not the case on Linux where the test will fail as expected.
124+
XCTAssertNil(exporter)
125+
}
101126
}
102127

103128

Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift

+33-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class BatchSpansProcessorTests: XCTestCase {
1111
let spanName1 = "MySpanName/1"
1212
let spanName2 = "MySpanName/2"
1313
let maxScheduleDelay = 0.5
14-
let tracerSdkFactory = TracerProviderSdk()
14+
var tracerSdkFactory = TracerProviderSdk()
1515
var tracer: Tracer!
1616
let blockingSpanExporter = BlockingSpanExporter()
1717
var mockServiceHandler = SpanExporterMock()
@@ -227,6 +227,38 @@ class BatchSpansProcessorTests: XCTestCase {
227227
XCTAssertEqual(exported, [span2.toSpanData()])
228228
XCTAssertTrue(waitingSpanExporter.shutdownCalled)
229229
}
230+
231+
func testShutdownNoMemoryCycle() {
232+
// A weak reference to the exporter that will be retained by the BatchWorker
233+
weak var exporter: WaitingSpanExporter?
234+
do {
235+
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
236+
exporter = waitingSpanExporter
237+
238+
let batch = BatchSpanProcessor(
239+
spanExporter: waitingSpanExporter,
240+
meterProvider: DefaultStableMeterProvider.instance,
241+
scheduleDelay: maxScheduleDelay
242+
)
243+
244+
tracerSdkFactory.addSpanProcessor(batch)
245+
let span1 = createSampledEndedSpan(spanName: spanName1)
246+
let span2 = createSampledEndedSpan(spanName: spanName2)
247+
let exported = waitingSpanExporter.waitForExport()
248+
249+
XCTAssertEqual(exported, [span1.toSpanData(), span2.toSpanData()])
250+
251+
tracerSdkFactory.shutdown()
252+
// Both the provider and the tracer retain the exporter, so we need to clear those out in order for the exporter to be deallocated.
253+
tracerSdkFactory = TracerProviderSdk()
254+
tracer = nil
255+
}
256+
257+
// After the BatchWorker is shutdown, it will continue waiting for the condition variable to be signaled up to the maxScheduleDelay. Until that point the exporter won't be deallocated.
258+
sleep(UInt32(ceil(maxScheduleDelay + 1)))
259+
// Interestingly, this will always succeed on macOS even if you intentionally create a strong reference cycle between the BatchWorker and the Thread's closure. I assume either calling cancel or the thread exiting releases the closure which breaks the cycle. This is not the case on Linux where the test will fail as expected.
260+
XCTAssertNil(exporter)
261+
}
230262
}
231263

232264
class BlockingSpanExporter: SpanExporter {

0 commit comments

Comments
 (0)