Replying to Avatar Cesar Dias

Working with websocketSubjects has been very easy and flexible, I can even push messages before the websocket was connected and the subject will buffer after connected.

webSocketSubjects just like the other subjects are multicasted.

this will only open 1 connection no matter how many subscriptions you have

```ts

const relay = webSocketSubject('wss://relay1.com')

relay.subscribe()

relay.subscribe()

```

now creating multiple webSocketSubjects it will open multiple connections.

webSocketSubjects multiplex will make a unicast subscription, this will make multiple nostr subscriptions in the same connection.

```ts

const relay = webSocketSubject('wss://relay1.com')

const sub = relay.multiplex(() => ["REQ", , ], () => ['CLOSE', ], (msg) => msg msg[1] === )

sub.subscribe()

sub.subscribe()

```

You can pass the webSocketSubject as you wish as it's also a observable, I store relays as webSocketSubjects in a Map in the Pool which doesn't do anything special, just blacklist relays with errors so it doesn't try to connect again.

Current my core/pool is doing the relay subscription which is incorrect, the consumer should be responsible to initiate the relay connection because he could put any policy mechanism for a particular relay, something like a rxjs retry to reconnect or some auth policy, currently my pool exposes a open callback and we could do something like this (pseudo code)

I am still working on auth and trying to think about some flexible way to do it, I'm a little bit hesitant to replay old messages after a a successful auth as I need to store these messages somewhere, but here's a small authenticator (which I think I should rename to createAuthPolicy or something) what takes a pool, a signer a options like whitelist which is an observable as the user can whitelist a observable after the auth was received.

Still half backed as I am still testing locally

Yeah, the tricky part seems to be intercepting send messages so that you can buffer them and re-send after authentication (or remove reqs that have been closed before they were sent while you were waiting for auth). Here's my (half-baked) policy for that:

```

export const socketPolicyDeferOnAuth = (socket: Socket) => {

const send$ = socket.send$

const buffer: ClientMessage[] = []

const authState = new AuthState(socket)

const okStatuses = [AuthStatus.None, AuthStatus.Ok]

// Defer sending certain messages when we're not authenticated

socket.send$ = send$.pipe(

filter(message => {

// Always allow sending auth

if (isClientAuth(message)) return true

// Always allow sending join requests

if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return true

// If we're not ok, remove the message and save it for later

if (!okStatuses.includes(authState.status)) {

buffer.push(message)

return false

}

return true

}),

)

// Send buffered messages when we get successful auth

return authState.subscribe((status: AuthStatus) => {

if (okStatuses.includes(status)) {

const reqs = new Set(buffer.filter(isClientReq).map(nth(1)))

const closed = new Set(buffer.filter(isClientClose).map(nth(1)))

for (const message of buffer.splice(0)) {

// Skip requests that were closed before they were sent

if (isClientReq(message) && closed.has(message[1])) continue

// Skip closes for requests that were never sent

if (isClientClose(message) && reqs.has(message[1])) continue

socket.send(message)

}

}

})

}

```

Reply to this note

Please Login to reply.

Discussion

Thanks for the code, just doing some quick code review here.

I think the first issue I spotted is the assigning a stream send$ to itself `socket.send$ = send$.pipe` try to think some other way to do that, it's very easy to compose multiple observables that derives from the same source in a functional style.

I would do something like on socket.ts

```ts

this.socket = webSocketSubject('...')

this.events$ = socket.pipe(filter(msg => msg[0] === 'event'))

this.auths$ = socket.pipe(filter(msg => msg[0] === 'AUTH'))

this.authsJoin$ = auths$.pipe(filter(msg => msg[1].kind === 28934))

this.oks$ = socket.pipe(filter(msg => msg[0] === 'OK'))

```

I would also remove the `authState.subscribe` for something like `return authState.pipe(tap(() => {}))`, but AuthState is a subject with a bunch of subscriptions inside there... hmmm, that's a bit bad, by looking at the commit history you basically kinda ported a event emitter system to rxjs right? you gonna have to embrace some heavy changes to see the gains of rxjs.

Try to think this way, let's say you have a single nostr subscription to a single relay, this subscription has all sorts of complex policies attached to the main stream, the main stream is also connected with other subscription stream like getting each user for each note in the main stream with also their own set of policies, then you decided to call unsubscribe() from the main stream, then the entire pipeline is unsubscribed and garbage collected, nothing left, with your current approach I am not seeing this happening, you might ended up with some parts of stream still listening for changes, unless you complete/unsubscribe the streams manually in multiple places which is a lot of work, which is something you have to do with event emitters.

I also did these exact mistakes a year ago, so I really don't judge you 😅

this angular tutorial describes really well the issue.

https://www.youtube.com/watch?v=bxslNFZvOvQ

Yeah, I think I understand all that in the abstract, I'm just finding it quite hard to actually execute, partly because of the duplex nature of websockets, which is what I've been focusing on far, and partly due to the forking nature of streams, which makes me unsure what subscriptions need to be managed and which don't.

RxJs is a mental journey, so embrace it haha, I know you do a bunch of functional programming throughout all coracle code, rxjs will just fit if you embrace it's declarative approach, like your `await tryCatch` function is kinda rxjs `catchError`.

If you have any other question or want to schedule a call just let me know

Thanks, I might just do that. For now, I'm probably going to go with emitter version since I really need to get this refactor done. But let me know when you publish the rxjs relay stuff, I'd love to learn/contribute there.