Helpers to stream data between vert.x ReadStream (e.g. HttpServerFileUpload), WriteStream (e.g. HttpServerResponse) and MongoDB AsyncInputStream and AsyncOutputStream.
This library is no longer maintained and will not receive any updates because of the following reasons:
- the mongodb driver will remove support for callback style invocations
- we no longer use GridFS to store binary data
- Java 8
- mongodb async driver >= 3.10.0
- vert.x >= 3.4.0
mvn:
<groupId>com.github.st-h</groupId>
<artifactId>vertx-mongo-streams</artifactId>
<version>2.1.0</version>
gradle:
com.github.st-h:vertx-mongo-streams:2.1.0
The GridFSInputStream factory method GridFSInputStream.create(vertx)
now requires the vertx instance as an argument. This is needed to ensure that the drainHandler
of the vert.x WriteStream
is called within the correct context. When the constructor is invoked, the current vertx context is stored and will be restored when needed. Therefore one should not try to cache the GridFSInputStream instance. Please see this blog post for details about the vert.x context.
Since vert.x 3.4.0 usage within java and groovy is identical.
The GridFSInputStream allows to directly Pump data from a vert.x ReadStream (e.g. HttpServerFileUpload) to MongoDB AsyncInputStream.
Just create a new instance using the GridFSInputStream.create()
method and use a Pump
to transfer the data. Call the end()
method when all data has been made available. The internal queue size can be changed using the setWriteQueueMaxSize()
method.
This snippet creates a fully working http server that persists a file to GridFS:
import com.github.sth.vertx.mongo.streams.GridFSInputStream;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.async.client.gridfs.GridFSBucket;
import com.mongodb.async.client.gridfs.GridFSBuckets;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.HttpServer;
import io.vertx.core.streams.Pump;
public class UploadVerticle extends AbstractVerticle {
private HttpServer httpServer;
@Override
public void start(Future fut) {
// setup GridFS
MongoDatabase db = MongoClients.create().getDatabase("test");
GridFSBucket gridFSBucket = GridFSBuckets.create(db, "test-bucket");
// setup the HttpServer
httpServer = vertx.createHttpServer().requestHandler(request -> {
request.setExpectMultipart(true);
request.uploadHandler(fileUpload -> {
// create a GridFSInputStream
GridFSInputStream gridFSInputStream = GridFSInputStream.create(vertx);
// when the upload has finished, notify the GridFSInputStream
fileUpload.endHandler(endHandler -> gridFSInputStream.end());
// just use a Pump to stream all the data
Pump.pump(fileUpload, gridFSInputStream).start();
gridFSBucket.uploadFromStream(fileUpload.filename(), gridFSInputStream, (id, t) -> {
if (t != null) {
// failed to persist
request.response().setStatusCode(500).end();
} else {
// sucessfully persisted with ID: id
request.response().end("uploaded: " + id);
}
});
});
}).listen(8080, res -> {
if (res.succeeded()) {
fut.complete();
} else {
fut.fail(res.cause());
}
});
}
@Override
public void stop(Future fut) {
httpServer.close( res -> fut.complete());
}
}
The GridFSOutputStream allows write to a vert.x WriteStream via the mongo drivers downloadToStream() method:
GridFSOutputStream outputStream = GridFSOutputStream.create(httpServerResponse)
gridFS.downloadToStream(objectId, outputStream, (bytesRead, t) -> {
...
})
If you want to build this library yourself, the integration test requires a running mongodb on default port 27017. If you are using docker, you could just use the commands from .travis.yml
to launch a docker container running a mongo instance and expose the default port:
docker pull mongo
docker run -d -p 127.0.0.1:27017:27017 mongo
Thanks to antofar for contributing an improved implementation of GridFSInputStream