Crafting Backup as a Service solution using Akka Streams: part 2
Using Akka Streams to stream data from Amazon S3 to MongoDB.
August 21, 2017 ·Part 1 Part 2
This is part two of the Crafting production-ready Backup as a Service solution using Akka Streams series. I highly encourage you to read the first part first if you didn’t, since it gives an overall context.
In the first part, we saw how to perform a data backup from MongoDB to S3 by composing a stream with encryption and compression using Akka Streams and Alpakka.
In this part, we’ll focus on doing the restoration. We’ll again use Akka Streams and Alpakka to stream the data stored in S3 to MongoDB. We’ll also face new interesting challenges, as doing the opposite is not always straightforward ;). Obviously, we’ll do all of this in Scala.
Ready? Let’s start one more time!
The Stream
The diagram below is a high-level overview of the restore stream we will implement.
The stream starts from S3 bucket (from a file called CookieCollectionBackup to be precise). In the first part, we applied compression to our backup stream, so we assume, that the backup file is compressed. That’s why in the subsequent step we’ll apply decompression. We’ll then convert each document into BSON and finally insert it into a MongoDB collection called cookies.
Now we’re going to build a stream doing the aforementioned operations.
Ingredients
- Akka Streams 2.5.4
- Alpakka S3 connector 0.11
- MongoDB Reactive Streams driver 1.5.0
The Code
The full runnable code can be found here: bszwej/mongodb-s3-stream-example. In the README.md you’ll find quick commands to run MongoDB using Docker as well as basic MongoShell operations useful when playing with backup or restore streams.
We’ll see many interesting things in our restore stream, so we’ll compose it in a step-by-step manner, gradually introducing new stages. Let’s start!
Our stream starts from a file called CookieCollectionBackup that is stored on S3 in a bucket called backup.bucket. We’re using Alpakka S3 client and calling the download
method available in its API passing bucket and file names where the backup data is located. This method returns a source of ByteStrings, representing our backup file contents.
Decompression
So right now, we’re able to download the file from S3 and get the source of ByteStrings with its contents. As mentioned before, we assume the file is compressed using gzip compression, so the next step is to apply decompression.
In this step, we apply gunzip
flow, which performs the actual decompression. Akka Streams has a few built-in compression flows that can be used to compress and decompress ByteStrings. They can be found under akka.stream.scaladsl.Compression
. Notice how many things are being done in just three lines of code!
Streaming documents to MongoDB
Here comes an interesting part! Let’s materialize our stream now and println
each element to see what’s inside.
s3Client
.download("backup.bucket", "CookieCollectionBackup")
.via(Compression.gunzip())
.runWith(Sink.foreach(bs ⇒ println(bs.utf8String)))
Output:
{"_id": {"$oid":"597c301779a802c47729d4d8"}, "name": "cookie1", "delicious": true}{"_id": {"$oid":"597d83b51771b3652d7d049d"}, "name": "cookie2","delicious": true}{"_id": {"$oid":"597d83b91771b3652d7d049e"}, "name": "cookie3", "delicious": true}
The raw output above presents the decompressed backup file contents. It consists of many raw Extended JSONs representing MongoDB documents, which are glued together. The stream at this point contains chunks of concatenated documents. Can you spot a problem here?
The problem is that we need to convert each Extended JSON to MongoDB BSON document in the next step in order to insert it into the database. To achieve it, we need to take each chunk of concatenated documents and split it in a way that each element of the stream contains a separate document. Fortunately, this time Akka Streams again comes with a Flow that does exactly what we need! It’s called akka.stream.scaladsl.JsonFraming#objectScanner
. What objectScanner
does is well-documented:
Returns a Flow that implements a “brace counting” based framing stage for emitting valid JSON chunks. It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks.
As you might have noticed, objectScanner
accepts one parameter, namely maximumObjectLength
. As the name says, it’s the maximum length of a JSON object being split. It’s set to 16,000,000, because MongoDB allows storing documents of a size up to 16 megabytes.
Now let’s materialize our stream again and println
each element.
val maximumObjectLength = 16000000
s3Client
.download("backup.bucket", "CookieCollectionBackup")
.via(Compression.gunzip())
.via(JsonFraming.objectScanner(maximumObjectLength))
.runWith(Sink.foreach(bs ⇒ println(bs.utf8String)))
Output:
{"_id": {"$oid": "597c301779a802c47729d4d8"}, "name": "cookie1", "delicious": true}
{"_id": {"$oid": "597d83b51771b3652d7d049d"}, "name": "cookie2", "delicious": true}
{"_id": {"$oid": "597d83b91771b3652d7d049e"}, "name": "cookie3", "delicious": true}
Awesome! Now each element contains a single document. Let’s go ahead and stream the documents to MongoDB.
The snippet above presents the whole solution.
- In the 11. line, we convert each incoming document encoded as
ByteString
to BSON document usingorg.bson.Document#parse
. Additionally, we useByteString#utf8String
to convert fromByteString
toString
. - In the 12. line, we insert each document to MongoDB. As mentioned, we use MongoDB Reactive Streams driver, which returns Reactive Streams Publishers. That’s why the return type of
insertOne
isorg.reactivestreams.Publisher
. We want to convert it to Akka Streams source, so we useSource.fromPublisher
helper, which accepts Reactive Streams publishers and creates Akka Streams source out of them. - In the 13. line, we materialize our stream ignoring the incoming elements.
Voilà! The stream is now able to read a file on Amazon S3 with compressed documents and store them in a MongoDB collection.
Optimisation
Right now, the documents are inserted one by one. We can optimize our stream to do bulk inserts. In order to do it, we can group the incoming documents into sequences of documents using grouped
stage. Then we can use bulkWrite
to insert the documents in bulk. This won’t be covered in this article, but you can see it in action on Github: bszwej/mongodb-s3-stream-example.
Summary
Now our backup solution is finally complete! Again, you could have seen, that Akka Streams with Alpakka can solve complex problems like compression or splitting a text stream into separate chunks literally out of the box. Doing those things manually would take time and increase both code and test complexity.
What I really wanted to show you in this two-part article is that Akka Streams offers not only a set of basic components for composing streams with backpressure, but also numerous built-in “extras”. They make crafting complex solutions like the one presented in this series significantly easier.
Thanks for reading and happy hAkking :)!
Resources
- https://github.com/bszwej/mongodb-s3-stream-example
- http://developer.lightbend.com/docs/alpakka/current/s3.html
- https://docs.mongodb.com/manual/reference/mongodb-extended-json/
- http://doc.akka.io/api/akka/current/akka/util/ByteString.html
- http://doc.akka.io/api/akka/current/akka/stream/scaladsl/Compression$.html
- http://doc.akka.io/api/akka/current/akka/stream/scaladsl/JsonFraming$.html