Office Hours Transcript: 2021-05-04

Jan joined

hello, Jan!

let me know if you run into problems with this chat system – I am concerned that my previous one might be discontinued in the coming months

Hi, Mark, I have a SharedFlow. The code only works in debugger and only if I set breakpoints inside the takewhile and collect brackets.

suspend fun validatePin(pin: String): Boolean {
        var response = ""
        streamCommand(ValidatePin(pin), pinCommandChar)
        val values = pinResponseChannel.pinResponseEvents
        values.takeWhile {
            val val1 = it.value[it.value.size - 1].compareTo(0x0a) != 0
            if ( val1 == false ) {
                // add last value that won't be collected
                response = readCharacteristicInFlow2(it, response)

            val val1 = it.value[it.value.size - 1].compareTo(0x0a) != 0
            response = readCharacteristicInFlow2(it, response)

        return ( response == "success" )

what is the experience outside of the debugger?

Code hangs. No data gets collected.

well, collect() is a suspend function, and it will not return until your flow is closed, and your flow will never be closed

so, that return statement will never be executed, and this function will never return

This is a sharedflow. I don’t think you can close those.


so collect() needs to go in its own coroutine, without any code after it

perhaps collect() isn’t really what you want

I just tested this change:
values.transformWhile { emit(it); it.value[it.value.size - 1].compareTo(0x0a) != 0 }.collect {
val val1 = it.value[it.value.size - 1].compareTo(0x0a) != 0
response = readCharacteristicInFlow2(it, response)

And the transformWhile works slightly better. Don’t need debugger breakpoints but I have to make the call twice.

I guess I do not understand the business rules here. It feels like you are expecting the collect() lambda to be called exactly once, and if that is the case, I do not know why you have a channel or a flow here.

if you only want the first item out of takeWhile(), then perhaps you should be using a different terminal operator than collect(), such as first()

The stream sends the command to the bluetooth device to validate its pin. When the bluetooth device returns the validation, that gets emitted to the shared flow.

The device only sends in 16 byte buffers so sometimes it takes a "loop" to get the full response back. I used to do a for loop on the channel.

But then switched to flows.

So first() works when I only have one 16 byte buffer but not when I have 30 of them.

Scott joined

where am i

(BTW, hello, Scott! I will be with you shortly, and let me know if you have problems with this chat system!)


Jan: it feels like you should have a channel or flow that is responsible for assembling complete messages and then emitting those complete messages downstream

and that the "assembling complete messages’

That is what this code is doing.

no, it is looking for a specific response

at least, as far as I can tell from your code

The transformWhile or takeWhile is looking for a specific response to know when the "assembling complete message" is completed.



The transformWhile seems to need an emit/collect so I guess I have to give up on a return from this function and somehow put it into its own launch. Do you agree?

that’s what I meant by collect() needs to be in its own coroutine

let me take a question from Scott, and I will be back with you in a bit

Okay. I’ll see if I can get that to work. But I really have no clue how the UI that is waiting on this to complete is going to figure that out on a launched coroutine.

Scott: over to you! how can I help you today?


I’m wondering if it’s possible to route all traffic from my app through a proxy

without rooting the device

what exactly is "all traffic" and what exactly is "a proxy"?

for example, do you mean Web requests and a Web proxy?

do you mean all IP packets and a VPN?

and is this something you are looking to do in production, or is this a test/debugging sort of thing?

proof of concept now to see if we can meet a potential client’s requirements

I don’t know much about this topic which is why I’m here

my understanding is that they want this for privacy reasons.

We aren’t sure if we can control the flow of data through all of our dependencies

oh, most likely, you can’t

unless you specifically choose dependencies that grant you that sort of control

the library would have to allow me to pass it the details of the proxy server right?

or let you supply the OkHttpClient or something that it uses

and that would assume only worrying about Web request

er, requests

I cannot rule out an in-app solution, if they have a very narrow scope of "all traffic" and "a proxy"

we’re using OpenTok (vonage), so it’s webrtc

but, on the whole, this feels like an IT question, not a programming question

ok. I’ll keep investigating.

That’s all for me today. Thank you!

sorry I didn’t have a better answer for you!

no worries

Jan: back to you!

Scott left

Would a StateFlow be a good way to communicate back to the UI that the collecting is finished?

that’s one possibility

another possibility is to fix your current logic, such that you use intermediate operators (e.g., filter()) to get things down to a single yes/no response, then use single() to get that result

It’s funny that absolutely no books or online examples are showing the use of launch with collect.

that is the very first sample from my book, and it shows launch() with collect()

I wish I had yes/no responses. I do on validate pin but other requests get a long list of network ssids.

OK, all I have to work on is the validate-PIN scenario

Okay. I’ll also take another look at cancellable co-routines that you pointed out. But I think StateFlow could be easiest as I don’t really understand all the cancellable co-routines stuff. I tried using flowOn thinking that might work but it doesn’t work on SharedFlow.

is everything on a request-response pattern? IOW, does the "long list of network ssids" eventually give you a complete result to forward along?

or is the long list of SSIDs really a continuous stream with no actual end?

Yes it does. I have that one tested already using the same transformWith logic as above. And I haven’t put that on in its own launch yet so the UI responds by displaying the list. So I guess launch isn’t always needed but probably safest for consistency.

then the validate-PIN and the long-list-of-SSIDs seem to be the same basic pattern:

  • get 16-byte buffers
  • assemble those into messages
  • filter out to get the message that you want
  • process that one message

Not continuous: the end is the 0x0A (line feed character) which I give to the "when" part of the clause on the flow. Yes you have that exactly right.

the first three of those bullets are intermediate operators, such as filter(), scan(), fold(), etc., to get to your single expected result

you then can consume that with single() and have it be wrapped in a suspend function returning that result

this assumes that you can "drop on the floor" other messages that do not concern you, because other consumers are watching for those

I have the filter already on so each consumer is already only getting their message:
val pinResponseEvents = _events.asSharedFlow() // publicly exposed as read-only shared flow
val wifiResponseEvents = _events.asSharedFlow() // publicly exposed as read-only shared flow
val wifiStatusResponseEvents = _events.asSharedFlow() // publicly exposed as read-only shared flow

fun responseEvent(characteristic: BGC,changeType:CharacteristicChange) {
    when (changeType) {
        CharacteristicChange.PIN -> pinResponseEvents.filter {it.uuid == characteristic.uuid}
        CharacteristicChange.WIFI -> wifiResponseEvents.filter {it.uuid == characteristic.uuid}
        CharacteristicChange.WIFI_STATUS -> wifiStatusResponseEvents.filter {it.uuid == characteristic.uuid}

I’ll see what scan and fold will do for me. So If I can use them coupled with single() doest that mean I don’t need the launch()?

there will be a launch() somewhere, but it would not need to be unique to this coroutine

IOW, you can have this be a suspend fun that returns a result

That’d be lovely.

FWIW, in preparation for some future Elements of Kotlin Coroutines update, I have a bunch more flow operator samples up in the Klassbook, including filter(), scan(), and fold()

How does the search work on Klassbook. I found takewith examle on Klassbook because of a google search match. But putting takewith in the Klassbook searchbox doesn’t find it. So if I wanted to find scan and fold, should I just use google?

the simplest thing is to visit the home page ( and search the page

I do need to improve the tag system on the Klassbook at some point

Oh! Well that is certainly simple!

Yeah, there’s a "Flows and Channels" section that lists all the flows and channels lessons, and the new operator ones are on the end of that section’s bullet liet

er, list

I walked through a lot of that last night! It was very helpful!. THanks for all your work on commonsware. Have a great day!

you too!

I don’t know how to exit this new chat system.

Do I just close the window?

yeah, you just close the window/tab/whatever