Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutines: send events flow #92

Merged
merged 40 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5c999cc
refactor: extract json mapping logic to Kotlin object
wzieba Nov 9, 2023
de701d2
refactor: introduce `SendEvents` use case with same logic as `Parsely…
wzieba Nov 9, 2023
4ae78b2
refactor: migrate `ParselyAPIConnection` to Coroutines
wzieba Nov 9, 2023
920e995
test: update `ParselyAPIConnectionTest` to support Coroutines
wzieba Nov 9, 2023
17e7ecd
tests: remove unit tests that checks for `GET` request on empty payload
wzieba Nov 9, 2023
5bff337
tests: update `ROOT_URL` before `ParselyTracker` constructor
wzieba Nov 9, 2023
51c89b9
feat: closing the connection after successful request
wzieba Nov 9, 2023
479d262
feat: close the connection after successful request
wzieba Nov 9, 2023
742fc5d
feat: make `ParselyAPIConnection` return `Result`
wzieba Nov 9, 2023
9668a91
refactor: move handling HTTP request results to `SendEvents`
wzieba Nov 9, 2023
b7c98a7
tests: update unit tests to reflect current scope of work of `Parsely…
wzieba Nov 9, 2023
1be8dfc
Merge branch 'coroutines' into coroutines-send-event
wzieba Nov 9, 2023
038c0ab
refactor: use `LocalStorageRepository` methods directly
wzieba Nov 10, 2023
f294f73
refactor: use `FlushManager` methods directly
wzieba Nov 11, 2023
c93b567
fix: on successful request, remove only sent events
wzieba Nov 11, 2023
debd023
style: remove unused methods of `LocalStorageRepository`
wzieba Nov 11, 2023
940b241
tests: add tests for SendEvents usecase
wzieba Nov 11, 2023
fa73ccd
refactor: move serialization details to `JsonSerializer`
wzieba Nov 11, 2023
fab4007
fix: do not stop flush manager if local queue is not empty
wzieba Nov 11, 2023
4237fcd
fix: add mutual execution for `SendEvents#invoke`
wzieba Nov 11, 2023
69f12b3
refactor: remove `FlushQueue` AsyncTask
wzieba Nov 11, 2023
ca330ab
style: remove unused methods
wzieba Nov 11, 2023
8d4edd4
fix: make `LocalStorageRepository#getStoredQueue` thread safe and off…
wzieba Nov 11, 2023
643defe
fix: do not create a deadlock
wzieba Nov 11, 2023
39f8831
fix: do not specify context for `ParselyAPIConnection`
wzieba Nov 13, 2023
dbc73e1
tests: fix loading an empty file for `ParselyAPIConnectionTest`
wzieba Nov 13, 2023
366dae5
refactor: rewrite SdkInit to simple function
wzieba Nov 17, 2023
acae5f6
refactor: introduce `FlushManager` interface
wzieba Nov 17, 2023
77303c6
refactor: introduce `QueueManager` interface
wzieba Nov 17, 2023
be74b28
refactor: introduce `QueueManager` interface
wzieba Nov 17, 2023
a13485c
refactor: introduce `RestClient` interface
wzieba Nov 17, 2023
cb5113c
feat: start `FlushManager` without checking state of stored queue first
wzieba Nov 17, 2023
4b4e471
refactor: pass `onFlush` lambda to `ParselyFlushManager`
wzieba Nov 17, 2023
c9872d4
refactor: rename `SendEvents` to `FlushQueue` and `isDebug` to `skipS…
wzieba Nov 17, 2023
269a9d6
style: improve position of logging statements in `FlushQueue`
wzieba Nov 17, 2023
409c42d
style: return from `FlushQueue` if `skipSendingEvents`
wzieba Nov 17, 2023
95f7a65
style: make variables test-local where possible
wzieba Nov 24, 2023
c32302f
feat: update lock logic on local storage repo
wzieba Nov 24, 2023
2c0ce6e
style: remove unnecessary `else` from `FlushQueue`
wzieba Nov 24, 2023
784a021
feat: do not stop FlushManager on successful flush
wzieba Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions example/src/main/java/com/example/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,18 @@ protected void onCreate(Bundle savedInstanceState) {
// Set debugging to true so we don't actually send things to Parse.ly
ParselyTracker.sharedInstance().setDebug(true);

final TextView queueView = (TextView) findViewById(R.id.queue_size);
queueView.setText(String.format("Queued events: %d", ParselyTracker.sharedInstance().queueSize()));

final TextView storedView = (TextView) findViewById(R.id.stored_size);
storedView.setText(String.format("Stored events: %d", ParselyTracker.sharedInstance().storedEventsCount()));

final TextView intervalView = (TextView) findViewById(R.id.interval);
storedView.setText(String.format("Flush interval: %d", ParselyTracker.sharedInstance().getFlushInterval()));

updateEngagementStrings();

final TextView views[] = new TextView[3];
views[0] = queueView;
views[1] = storedView;
views[2] = intervalView;
final TextView views[] = new TextView[1];
views[0] = intervalView;

final Handler mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
TextView[] v = (TextView[]) msg.obj;
TextView qView = v[0];
qView.setText(String.format("Queued events: %d", ParselyTracker.sharedInstance().queueSize()));

TextView sView = v[1];
sView.setText(String.format("Stored events: %d", ParselyTracker.sharedInstance().storedEventsCount()));

TextView iView = v[2];
TextView iView = v[0];
if (ParselyTracker.sharedInstance().flushTimerIsActive()) {
iView.setText(String.format("Flush Interval: %d", ParselyTracker.sharedInstance().getFlushInterval()));
} else {
Expand Down
19 changes: 2 additions & 17 deletions example/src/main/res/layout/activity_main.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,11 @@
android:onClick="trackReset"
android:text="@string/button_reset_video" />

<TextView
android:id="@+id/queue_size"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_below="@+id/reset_video_button"
android:layout_centerHorizontal="true"
android:text="Queued events: 0" />

<TextView android:id="@+id/stored_size"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_centerHorizontal="true"
android:layout_below="@id/queue_size"
android:text="Stored events: 0"/>

<TextView android:id="@+id/interval"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_centerHorizontal="true"
android:layout_below="@id/stored_size"
android:layout_below="@id/reset_video_button"
android:text="Flush timer inactive"/>

<TextView
Expand All @@ -107,4 +92,4 @@
android:text="Video is inactive." />


</RelativeLayout>
</RelativeLayout>
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,12 @@ class FunctionalTests {
activity: Activity,
flushInterval: Duration = defaultFlushInterval
): ParselyTracker {
val field: Field = ParselyTracker::class.java.getDeclaredField("ROOT_URL")
field.isAccessible = true
field.set(this, url)
return ParselyTracker.sharedInstance(
siteId, flushInterval.inWholeSeconds.toInt(), activity.application
).apply {
val f: Field = this::class.java.getDeclaredField("ROOT_URL")
f.isAccessible = true
f.set(this, url)
}
)
}

private companion object {
Expand Down
25 changes: 17 additions & 8 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,35 @@ import kotlinx.coroutines.launch
* Handles stopping and starting the flush timer. The flush timer
* controls how often we send events to Parse.ly servers.
*/
internal class FlushManager(
private val parselyTracker: ParselyTracker,
val intervalMillis: Long,
internal interface FlushManager {
fun start()
fun stop()
val isRunning: Boolean
val intervalMillis: Long
}

internal class ParselyFlushManager(
private val onFlush: () -> Unit,
override val intervalMillis: Long,
private val coroutineScope: CoroutineScope
) {
) : FlushManager {
private var job: Job? = null

fun start() {
override fun start() {
if (job?.isActive == true) return

job = coroutineScope.launch {
while (isActive) {
delay(intervalMillis)
parselyTracker.flushEvents()
onFlush.invoke()
}
}
}

fun stop() = job?.cancel()
override fun stop() {
job?.cancel()
}

val isRunning: Boolean
override val isRunning: Boolean
get() = job?.isActive ?: false
}
51 changes: 51 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.parsely.parselyandroid

import com.parsely.parselyandroid.JsonSerializer.toParselyEventsPayload
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class FlushQueue(
private val flushManager: FlushManager,
private val repository: QueueRepository,
private val restClient: RestClient,
private val scope: CoroutineScope
) {

private val mutex = Mutex()

operator fun invoke(skipSendingEvents: Boolean) {
scope.launch {
mutex.withLock {
val eventsToSend = repository.getStoredQueue()

if (eventsToSend.isEmpty()) {
flushManager.stop()
return@launch
}

if (skipSendingEvents) {
ParselyTracker.PLog("Debug mode on. Not sending to Parse.ly. Otherwise, would sent ${eventsToSend.size} events")
repository.remove(eventsToSend)
return@launch
Comment on lines +28 to +31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to stop the flushManager as well? I guess it'll get stopped the next time it's flushed. 🤔

I wonder if we should skip starting the FlushQueue all together in a case where we don't want the events to be sent. This is probably beyond the scope of this PR though, so I am just making a note of it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed stopping flushManager from this part of code in 784a021.

I wonder if we should skip starting the FlushQueue all together in a case where we don't want the events to be sent.

I see your point but the skipSendingEvents option is used only for tests (debug mode). As it's like --dry-run, I wouldn't opt to remove starting flushManager then, as this would cause a significant discrepancy between debug and non-debug modes of SDK. In other words: in debug mode, I believe we want to be as close as possible to non-debug. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point but the skipSendingEvents option is used only for tests (debug mode). As it's like --dry-run, I wouldn't opt to remove starting flushManager then, as this would cause a significant discrepancy between debug and non-debug modes of SDK. In other words: in debug mode, I believe we want to be as close as possible to non-debug. WDYT?

That makes sense and sounds like the more practical option 👍

}
ParselyTracker.PLog("Sending request with %d events", eventsToSend.size)
val jsonPayload = toParselyEventsPayload(eventsToSend)
ParselyTracker.PLog("POST Data %s", jsonPayload)
ParselyTracker.PLog("Requested %s", ParselyTracker.ROOT_URL)
restClient.send(jsonPayload)
.fold(
onSuccess = {
ParselyTracker.PLog("Pixel request success")
repository.remove(eventsToSend)
},
onFailure = {
ParselyTracker.PLog("Pixel request exception")
ParselyTracker.PLog(it.toString())
}
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlinx.coroutines.sync.withLock

internal class InMemoryBuffer(
private val coroutineScope: CoroutineScope,
private val localStorageRepository: LocalStorageRepository,
private val localStorageRepository: QueueRepository,
private val onEventAddedListener: () -> Unit,
) {

Expand Down
32 changes: 32 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/JsonSerializer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.parsely.parselyandroid

import com.fasterxml.jackson.databind.ObjectMapper
import java.io.IOException
import java.io.StringWriter

internal object JsonSerializer {

fun toParselyEventsPayload(eventsToSend: List<Map<String, Any?>?>): String {
val batchMap: MutableMap<String, Any> = HashMap()
batchMap["events"] = eventsToSend
return toJson(batchMap).orEmpty()
}
/**
* Encode an event Map as JSON.
*
* @param map The Map object to encode as JSON.
* @return The JSON-encoded value of `map`.
*/
private fun toJson(map: Map<String, Any>): String? {
val mapper = ObjectMapper()
var ret: String? = null
try {
val strWriter = StringWriter()
mapper.writeValue(strWriter, map)
ret = strWriter.toString()
} catch (e: IOException) {
e.printStackTrace()
}
return ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import java.io.EOFException
import java.io.FileNotFoundException
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal open class LocalStorageRepository(private val context: Context) {
internal interface QueueRepository {
suspend fun remove(toRemove: List<Map<String, Any?>?>)
suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?>
suspend fun insertEvents(toInsert: List<Map<String, Any?>?>)
}

internal class LocalStorageRepository(private val context: Context) : QueueRepository {

private val mutex = Mutex()

Expand All @@ -33,23 +38,7 @@ internal open class LocalStorageRepository(private val context: Context) {
}
}

/**
* Delete the stored queue from persistent storage.
*/
fun purgeStoredQueue() {
persistObject(ArrayList<Map<String, Any>>())
}

fun remove(toRemove: List<Map<String, Any>>) {
persistObject(getStoredQueue() - toRemove.toSet())
}

/**
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open fun getStoredQueue(): ArrayList<Map<String, Any?>?> {
private fun getInternalStoredQueue(): ArrayList<Map<String, Any?>?> {
var storedQueue: ArrayList<Map<String, Any?>?> = ArrayList()
try {
val fis = context.applicationContext.openFileInput(STORAGE_KEY)
Expand All @@ -71,19 +60,26 @@ internal open class LocalStorageRepository(private val context: Context) {
return storedQueue
}

override suspend fun remove(toRemove: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(storedEvents - toRemove.toSet())
}

/**
* Delete an event from the stored queue.
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open fun expelStoredEvent() {
val storedQueue = getStoredQueue()
storedQueue.removeAt(0)
override suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?> = mutex.withLock {
getInternalStoredQueue()
}

/**
* Save the event queue to persistent storage.
*/
open suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
persistObject(ArrayList((toInsert + getStoredQueue()).distinct()))
override suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(ArrayList((toInsert + storedEvents).distinct()))
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,32 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
@file:Suppress("DEPRECATION")
package com.parsely.parselyandroid

import android.os.AsyncTask
import java.net.HttpURLConnection
import java.net.URL

internal class ParselyAPIConnection(private val tracker: ParselyTracker) : AsyncTask<String?, Exception?, Void?>() {
private var exception: Exception? = null
internal interface RestClient {
suspend fun send(payload: String): Result<Unit>
}

@Deprecated("Deprecated in Java")
override fun doInBackground(vararg data: String?): Void? {
internal class ParselyAPIConnection(private val url: String) : RestClient {
override suspend fun send(payload: String): Result<Unit> {
var connection: HttpURLConnection? = null
try {
if (data.size == 1) { // non-batched (since no post data is included)
connection = URL(data[0]).openConnection() as HttpURLConnection
connection.inputStream
} else if (data.size == 2) { // batched (post data included)
connection = URL(data[0]).openConnection() as HttpURLConnection
connection.doOutput = true // Triggers POST (aka silliest interface ever)
connection.setRequestProperty("Content-Type", "application/json")
val output = connection.outputStream
output.write(data[1]?.toByteArray())
output.close()
connection.inputStream
}
connection = URL(url).openConnection() as HttpURLConnection
connection.doOutput = true
connection.setRequestProperty("Content-Type", "application/json")
val output = connection.outputStream
output.write(payload.toByteArray())
output.close()
connection.inputStream
} catch (ex: Exception) {
exception = ex
return Result.failure(ex)
} finally {
connection?.disconnect()
}
return null
}

@Deprecated("Deprecated in Java")
override fun onPostExecute(result: Void?) {
if (exception != null) {
ParselyTracker.PLog("Pixel request exception")
ParselyTracker.PLog(exception.toString())
} else {
ParselyTracker.PLog("Pixel request success")

// only purge the queue if the request was successful
tracker.purgeEventsQueue()
ParselyTracker.PLog("Event queue empty, flush timer cleared.")
tracker.stopFlushTimer()
}
return Result.success(Unit)
}
}
Loading