Syncing document to Postgres database

Hi there!

First off- just wanted to say Yjs is awesome- it was everything we were looking for to implement our small scale collaborative note component.

I was looking through the documentation on how to save a document and load a document and haven’t been able to find any clear steps to do so.

I am using React and Quill for the client side.

My backend stack is Django, Channels with Postgres database. I’d specifically like to know how I can save document contents to my database (what format should I be saving it as). Then when its time to load the document again on a new connection, how do I convert that document back to a Y document and send that back to the frontend to consume?

The current consumer looks as follows. Thanks to @anuj for the code provided in a previous topic:

Thanks so much for your time!

class NoteConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.note_id = self.scope['url_route']['kwargs']['note_id']
        self.group_name = f'{self.note_id}'

        self.ydoc = Y.YDoc()
    

        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )

        await self.accept()
        state = Y.encode_state_vector(self.ydoc)
        msg = create_sync_step1_message(state)
        await self.send_message(msg)

    async def send_message(self, bytes_data):
        if not bytes_data:
            return
        # Send message to room group
        await self.channel_layer.group_send(self.group_name, {"type": "chat_message", "message": bytes_data})

    async def chat_message(self, message):
        # encoded_file_data = BytesIO(message.encode("utf-8")).getvalue()

        await self.send(bytes_data=message['message'])

    async def receive(self, text_data=None, bytes_data=None):
        if text_data:
            text = json.loads(text_data)
            auth_token = text.get('auth', None)
            if auth_token:
                await self.auth_user(auth_token)
        if bytes_data:
            await self.send_message(bytes_data)
            update = await self.process_message(bytes_data, self.ydoc)
        # Save this update to database

    async def auth_user(self, auth_token):
        test_auth = get_user(auth_token)
        if test_auth:
            await self.send(text_data="authenticated")
        else:
            await self.send(text_data="authentication-failed")

    async def process_message(self, message: bytes, ydoc: Y.YDoc):
        if message[0] == YMessageType.SYNC:
            message_type = message[1]
            msg = message[2:]
            if message_type == YSyncMessageType.SYNC_STEP1:
                state = read_message(msg)
                update = Y.encode_state_as_update(ydoc, state)
                reply = create_sync_step2_message(update)
                await self.send_message(reply)
            elif message_type in (YSyncMessageType.SYNC_STEP2, YSyncMessageType.SYNC_UPDATE):
                update = read_message(msg)
                Y.apply_update(ydoc, update)
                return update

You can just store the binary blob for now. Of course, nicer way would be to convert the Ydoc into JSON and then convert back on hydrate. But just use the binary updates since it’s the easiest.

Thanks for the reply @TeemuKoivisto! Would it be possible for you to point me to any documentation on how do to this?

I think the conversion to JSON is documented here, but it also looks like this is deprecated:

Thanks again!

I have used yDocToProsemirrorJSON from y-prosemirror package. There is probably one for the Quill package as well.

Thanks again- it doesn’t look like y-quill has anything similar to that based on their repo so may need to either convert to JSON from quill directly or go with the blob suggestion.

Is there any documentation on how to convert the yDoc to and from blob? I wasn’t able to find this either…

Y.encodeStateAsUpdate(doc) returns a Uint8Array of the entire Doc.

Subsequent updates can be saved individually when the doc changes:

doc.on('update', (update: Uint8Array) => {
  // persist update
})

Most databases have some kind of blob type that allows you to save binary data.

Take a look at the source code for y-indexeddb. There is some db-specific logic there, but it also shows the use of Y.encodeStateAsUpdate and how storeUpdate is called whenever an update event is received on the doc.

Lastly, here’s how y-leveldb stores an update:

1 Like

Huh, well it seems you’re out of luck then. You can definitely work out how to convert them into JSON if you want to, but I’d suggest using blobs first and seeing whether it fits your use-case or not. It gets rid of an edge-case where a user would still have old history when you’ve already converted the blob into JSON and destroyed the doc. But definitely you can do it. Maybe also contribute back to the original y-quill package as well :slight_smile:

1 Like

Thanks Raine- that is really helpful! I will give this a shot.

Thats a good point- I didn’t think of that possible issue… Not sure I’m qualified to contribute, but if by chance I get somewhere I will definitely give it a shot! Thanks again!

Hi again- sorry really not sure what I’m doing wrong here. My data flow is now as follows:

  • Through a debounce I am successfully sending stringified document data via Y.encodeStateAsUpdate(doc) to my server and database.
  • Upon refreshing, I am receiving the document from a socket send via my server, converting it back to a Uint8Array and then attempting to apply update:
              if (jsonData.type === "load") {
                const arr: any = Object.values(jsonData.message);
                const uint8arr = new Uint8Array(arr);
                Y.applyUpdate(ydoc, uint8arr);
              }

However, nothing happens when I do this (no error messages, it seemingly goes through). However, the document does not update, and it seems to break sync with a separate browser instance…

Do you have any other tips as to what I might be doing wrong?

Thank you!

applyUpdate and encodeStateAsUpdate are effectively inverses, so I am guessing it’s a decoding problem. I would compare the actual binary with the expected binary at each step to find where it is going wrong.

You could also clone y-leveldb, replace level calls with postgres calls, and swap it into y-websocket. It may sound more complicated, but it has the benefit of focusing you on the leveldb → postgres conversion rather than wrangling yjs primitives.

If only this page of the documentation had been completed… Custom Provider - Yjs Docs

Thanks Raine! Yes you were correct- it was a decoding problem… I missed the paragraph in the documentation on how we are not able to convert Uint8Arrays into JSON without first converting to base 64: Document Updates - Yjs Docs

There is still a little bit of a sync issue between multiple users, but this fixed the loading problem! Thanks again for your help.

1 Like

Hey @mgug01,

I see you’re using Django and Channels.
It seems you’re using a YDoc (via ypy) for every consumer (=client connection). I don’t think that’s how it’s supposed to work as you likely want one single server-side YDoc that’s taking updates from all clients and not separate server-side YDocs for every client.

I just published a project that implements the Yjs sync protocol on Django Channels with a central YDoc worker: GitHub - stefanw/channels-yroom

Maybe it helps!

Wow you’re a legend @stefanw - this repo is super valuable, I’ll need to give it a closer look.
I think you may be correct about my problematic implementation. Currently- even though two or more users are able to connect to the channel, there is a delay in initial connection likely due to the number of Ydocs present…

Below is the consumer I landed on complete with custom authentication and document persistence to my Postgres database. I will need to see how I can adjust to utilize your methodology:

class NoteConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.note_id = self.scope['url_route']['kwargs']['note_id']
        self.group_name = f'{self.note_id}'
        self.document_obj = await get_note(self.note_id)
        self.ydoc = Y.YDoc()

        if self.document_obj:
            await self.channel_layer.group_add(
                self.group_name,
                self.channel_name
            )
            await self.accept()
            state = Y.encode_state_vector(self.ydoc)
            msg = create_sync_step1_message(state)
            await self.send_message(msg)
        else:
            await self.close()

    async def send_message(self, bytes_data):
        if not bytes_data:
            return
        # Send message to room group
        await self.channel_layer.group_send(self.group_name, {"type": "chat_message", "message": bytes_data})

    async def chat_message(self, message):
        await self.send(bytes_data=message['message'])

    async def receive(self, text_data=None, bytes_data=None):
        if text_data:
            text = json.loads(text_data)
            request_type = text['type']
            if "auth" in request_type:
                auth_token = text.get('auth', None)
                if auth_token:
                    await self.auth_user(auth_token)
            if "document_save" in request_type:
                doc_text = text.get('document', None)
                doc_data = text.get('bytesDoc', None)
                await update_note(self.document_obj, doc_text, doc_data)
            if "load" in request_type:
                await self.request_load(self)

        if bytes_data:
            await self.send_message(bytes_data)
            update = await self.process_message(bytes_data, self.ydoc)

    async def request_load(self, data):
        byte_data = await load_document(self.document_obj)
        if byte_data:
            await self.send(text_data=json.dumps(
                {"type": "load", "message": byte_data}))
        else:
            await self.send(text_data=json.dumps({"type": "load", "message": None}))

    async def auth_user(self, auth_token):
        test_auth = get_user(auth_token)
        if test_auth:
            await self.send(text_data=json.dumps({"type": "auth", "message": "authenticated"}))
            return
        else:
            await self.send(text_data="authentication-failed")

    async def process_message(self, message: bytes, ydoc: Y.YDoc):
        if message[0] == YMessageType.SYNC:
            message_type = message[1]
            msg = message[2:]
            if message_type == YSyncMessageType.SYNC_STEP1:
                state = read_message(msg)
                update = Y.encode_state_as_update(ydoc, state)
                reply = create_sync_step2_message(update)
                await self.send_message(reply)
            elif message_type in (YSyncMessageType.SYNC_STEP2, YSyncMessageType.SYNC_UPDATE):
                update = read_message(msg)
                Y.apply_update(ydoc, update)
                return update

The idea of channel-yroom was to have a minimal channel consumer that just forwards y-protocol (sync+awareness) messages to the worker which keeps a central ydoc instance. The y-websocket JS server implementation also only keeps a single ydoc per room and handles all connections in one process, as far as I can tell.

Django Channels is different in that client connections (e.g. consumers) can by design be handled with multiple app processes (on potentially many servers) and a ‘channel layer’ (e.g. Redis) is used for communication between the consumers. So to keep a single server-side Ydoc, I decided to use a channel worker consumer.

The channels-yroom consumer only looks at bytes_data and handles that as y-protocol messages. So you can overwrite .receive and use text_data for your API.

Hi @stefanw! Just this week getting back into trying to solve my issues with my current setup. It seems the connection is a bit inconsistent on our server- sometimes the user will connect to the document successfully, other times it will not without explanation.

I’m going to try channels-yroom as you suggested. When is the YDoc actually created? And is it true that with one central YDoc I’m still able to have users edit and collaborate on separate “documents” that are identified by ID number as shown here?:

async def connect(self):

        query_string = self.scope['query_string'].decode()
        query_string = parse_qs(query_string)
        params = {key: value[0] if len(
            value) == 1 else value for key, value in query_string.items()}

        # self.note_id = self.scope['url_route']['kwargs']['note_id']
        note_id = params.get('note_id', None)
        note_id = note_id.replace('/', '')
        self.note_id = note_id
        self.group_name = f'{self.note_id}'
        self.document_obj = await get_note(self.note_id)
        self.ydoc = Y.YDoc()

        if self.document_obj:
            await self.channel_layer.group_add(
                self.group_name,
                self.channel_name
            )

            await self.accept()
            # await load_document(self.ydoc)
            state = Y.encode_state_vector(self.ydoc)
            msg = create_sync_step1_message(state)
            await self.send_message(msg)
        else:
            await self.close()

Hey @mgug01!

The worker manages all the ‘rooms’ which each have YDoc + Awareness. The YDoc is created when the room is created and loaded from a snapshot if one is present. When the last client disconnects, the room manager waits 30 seconds and then saves a snapshot and evicts the room from memory.

It’s not one central YDoc but rather one central YDoc per room (collaboration space). This is instead of creating a ‘mirror’ YDoc for every client connection on the server which all need to synchronize and handle updates between each other which doesn’t look very efficient. See here for an explanation and diagram.
The rooms are identified by a string which can contain a namespace for different room settings.