aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/virtiofsd/fuse_virtio.c359
1 files changed, 201 insertions, 158 deletions
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index f6242f9338..0dcf2ef57a 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -22,6 +22,7 @@
#include <assert.h>
#include <errno.h>
+#include <glib.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -37,17 +38,28 @@
struct fv_VuDev;
struct fv_QueueInfo {
pthread_t thread;
+ /*
+ * This lock protects the VuVirtq preventing races between
+ * fv_queue_thread() and fv_queue_worker().
+ */
+ pthread_mutex_t vq_lock;
+
struct fv_VuDev *virtio_dev;
/* Our queue index, corresponds to array position */
int qidx;
int kick_fd;
int kill_fd; /* For killing the thread */
+};
- /* The element for the command currently being processed */
- VuVirtqElement *qe;
+/* A FUSE request */
+typedef struct {
+ VuVirtqElement elem;
+ struct fuse_chan ch;
+
+ /* Used to complete requests that involve no reply */
bool reply_sent;
-};
+} FVRequest;
/*
* We pass the dev element into libvhost-user
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
struct iovec *iov, int count)
{
- VuVirtqElement *elem;
- VuVirtq *q;
+ FVRequest *req = container_of(ch, FVRequest, ch);
+ struct fv_QueueInfo *qi = ch->qi;
+ VuDev *dev = &se->virtio_dev->dev;
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
+ VuVirtqElement *elem = &req->elem;
int ret = 0;
assert(count >= 1);
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
/* unique == 0 is notification, which we don't support */
assert(out->unique);
- /* For virtio we always have ch */
- assert(ch);
- assert(!ch->qi->reply_sent);
- elem = ch->qi->qe;
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+ assert(!req->reply_sent);
/* The 'in' part of the elem is to qemu */
unsigned int in_num = elem->in_num;
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
}
copy_iov(iov, count, in_sg, in_num, tosend_len);
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
- vu_queue_notify(&se->virtio_dev->dev, q);
- ch->qi->reply_sent = true;
+
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+ pthread_mutex_lock(&qi->vq_lock);
+ vu_queue_push(dev, q, elem, tosend_len);
+ vu_queue_notify(dev, q);
+ pthread_mutex_unlock(&qi->vq_lock);
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+ req->reply_sent = true;
err:
return ret;
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
struct iovec *iov, int count, struct fuse_bufvec *buf,
size_t len)
{
+ FVRequest *req = container_of(ch, FVRequest, ch);
+ struct fv_QueueInfo *qi = ch->qi;
+ VuDev *dev = &se->virtio_dev->dev;
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
+ VuVirtqElement *elem = &req->elem;
int ret = 0;
- VuVirtqElement *elem;
- VuVirtq *q;
assert(count >= 1);
assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
/* unique == 0 is notification which we don't support */
assert(out->unique);
- /* For virtio we always have ch */
- assert(ch);
- assert(!ch->qi->reply_sent);
- elem = ch->qi->qe;
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+ assert(!req->reply_sent);
/* The 'in' part of the elem is to qemu */
unsigned int in_num = elem->in_num;
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
ret = 0;
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
- vu_queue_notify(&se->virtio_dev->dev, q);
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+ pthread_mutex_lock(&qi->vq_lock);
+ vu_queue_push(dev, q, elem, tosend_len);
+ vu_queue_notify(dev, q);
+ pthread_mutex_unlock(&qi->vq_lock);
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
err:
if (ret == 0) {
- ch->qi->reply_sent = true;
+ req->reply_sent = true;
}
return ret;
}
+/* Process one FVRequest in a thread pool */
+static void fv_queue_worker(gpointer data, gpointer user_data)
+{
+ struct fv_QueueInfo *qi = user_data;
+ struct fuse_session *se = qi->virtio_dev->se;
+ struct VuDev *dev = &qi->virtio_dev->dev;
+ FVRequest *req = data;
+ VuVirtqElement *elem = &req->elem;
+ struct fuse_buf fbuf = {};
+ bool allocated_bufv = false;
+ struct fuse_bufvec bufv;
+ struct fuse_bufvec *pbufv;
+
+ assert(se->bufsize > sizeof(struct fuse_in_header));
+
+ /*
+ * An element contains one request and the space to send our response
+ * They're spread over multiple descriptors in a scatter/gather set
+ * and we can't trust the guest to keep them still; so copy in/out.
+ */
+ fbuf.mem = malloc(se->bufsize);
+ assert(fbuf.mem);
+
+ fuse_mutex_init(&req->ch.lock);
+ req->ch.fd = -1;
+ req->ch.qi = qi;
+
+ /* The 'out' part of the elem is from qemu */
+ unsigned int out_num = elem->out_num;
+ struct iovec *out_sg = elem->out_sg;
+ size_t out_len = iov_size(out_sg, out_num);
+ fuse_log(FUSE_LOG_DEBUG,
+ "%s: elem %d: with %d out desc of length %zd\n",
+ __func__, elem->index, out_num, out_len);
+
+ /*
+ * The elem should contain a 'fuse_in_header' (in to fuse)
+ * plus the data based on the len in the header.
+ */
+ if (out_len < sizeof(struct fuse_in_header)) {
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
+ __func__, elem->index);
+ assert(0); /* TODO */
+ }
+ if (out_len > se->bufsize) {
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
+ elem->index);
+ assert(0); /* TODO */
+ }
+ /* Copy just the first element and look at it */
+ copy_from_iov(&fbuf, 1, out_sg);
+
+ pbufv = NULL; /* Compiler thinks an unitialised path */
+ if (out_num > 2 &&
+ out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
+ ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
+ out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
+ /*
+ * For a write we don't actually need to copy the
+ * data, we can just do it straight out of guest memory
+ * but we must still copy the headers in case the guest
+ * was nasty and changed them while we were using them.
+ */
+ fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
+
+ /* copy the fuse_write_in header afte rthe fuse_in_header */
+ fbuf.mem += out_sg->iov_len;
+ copy_from_iov(&fbuf, 1, out_sg + 1);
+ fbuf.mem -= out_sg->iov_len;
+ fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
+
+ /* Allocate the bufv, with space for the rest of the iov */
+ pbufv = malloc(sizeof(struct fuse_bufvec) +
+ sizeof(struct fuse_buf) * (out_num - 2));
+ if (!pbufv) {
+ fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
+ __func__);
+ goto out;
+ }
+
+ allocated_bufv = true;
+ pbufv->count = 1;
+ pbufv->buf[0] = fbuf;
+
+ size_t iovindex, pbufvindex;
+ iovindex = 2; /* 2 headers, separate iovs */
+ pbufvindex = 1; /* 2 headers, 1 fusebuf */
+
+ for (; iovindex < out_num; iovindex++, pbufvindex++) {
+ pbufv->count++;
+ pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
+ pbufv->buf[pbufvindex].flags = 0;
+ pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
+ pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
+ }
+ } else {
+ /* Normal (non fast write) path */
+
+ /* Copy the rest of the buffer */
+ fbuf.mem += out_sg->iov_len;
+ copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
+ fbuf.mem -= out_sg->iov_len;
+ fbuf.size = out_len;
+
+ /* TODO! Endianness of header */
+
+ /* TODO: Add checks for fuse_session_exited */
+ bufv.buf[0] = fbuf;
+ bufv.count = 1;
+ pbufv = &bufv;
+ }
+ pbufv->idx = 0;
+ pbufv->off = 0;
+ fuse_session_process_buf_int(se, pbufv, &req->ch);
+
+out:
+ if (allocated_bufv) {
+ free(pbufv);
+ }
+
+ /* If the request has no reply, still recycle the virtqueue element */
+ if (!req->reply_sent) {
+ struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
+
+ fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
+ elem->index);
+
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+ pthread_mutex_lock(&qi->vq_lock);
+ vu_queue_push(dev, q, elem, 0);
+ vu_queue_notify(dev, q);
+ pthread_mutex_unlock(&qi->vq_lock);
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+ }
+
+ pthread_mutex_destroy(&req->ch.lock);
+ free(fbuf.mem);
+ free(req);
+}
+
/* Thread function for individual queues, created when a queue is 'started' */
static void *fv_queue_thread(void *opaque)
{
struct fv_QueueInfo *qi = opaque;
struct VuDev *dev = &qi->virtio_dev->dev;
struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
- struct fuse_session *se = qi->virtio_dev->se;
- struct fuse_chan ch;
- struct fuse_buf fbuf;
+ GThreadPool *pool;
- fbuf.mem = NULL;
- fbuf.flags = 0;
-
- fuse_mutex_init(&ch.lock);
- ch.fd = (int)0xdaff0d111;
- ch.qi = qi;
+ pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
+ TRUE, NULL);
+ if (!pool) {
+ fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
+ return NULL;
+ }
fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
qi->qidx, qi->kick_fd);
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
/* Mutual exclusion with virtio_loop() */
ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
assert(ret == 0); /* there is no possible error case */
+ pthread_mutex_lock(&qi->vq_lock);
/* out is from guest, in is too guest */
unsigned int in_bytes, out_bytes;
vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
"%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
__func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
-
while (1) {
- bool allocated_bufv = false;
- struct fuse_bufvec bufv;
- struct fuse_bufvec *pbufv;
-
- /*
- * An element contains one request and the space to send our
- * response They're spread over multiple descriptors in a
- * scatter/gather set and we can't trust the guest to keep them
- * still; so copy in/out.
- */
- VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
- if (!elem) {
+ FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
+ if (!req) {
break;
}
- qi->qe = elem;
- qi->reply_sent = false;
+ req->reply_sent = false;
- if (!fbuf.mem) {
- fbuf.mem = malloc(se->bufsize);
- assert(fbuf.mem);
- assert(se->bufsize > sizeof(struct fuse_in_header));
- }
- /* The 'out' part of the elem is from qemu */
- unsigned int out_num = elem->out_num;
- struct iovec *out_sg = elem->out_sg;
- size_t out_len = iov_size(out_sg, out_num);
- fuse_log(FUSE_LOG_DEBUG,
- "%s: elem %d: with %d out desc of length %zd\n", __func__,
- elem->index, out_num, out_len);
-
- /*
- * The elem should contain a 'fuse_in_header' (in to fuse)
- * plus the data based on the len in the header.
- */
- if (out_len < sizeof(struct fuse_in_header)) {
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
- __func__, elem->index);
- assert(0); /* TODO */
- }
- if (out_len > se->bufsize) {
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
- __func__, elem->index);
- assert(0); /* TODO */
- }
- /* Copy just the first element and look at it */
- copy_from_iov(&fbuf, 1, out_sg);
-
- if (out_num > 2 &&
- out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
- ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
- out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
- /*
- * For a write we don't actually need to copy the
- * data, we can just do it straight out of guest memory
- * but we must still copy the headers in case the guest
- * was nasty and changed them while we were using them.
- */
- fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
-
- /* copy the fuse_write_in header after the fuse_in_header */
- fbuf.mem += out_sg->iov_len;
- copy_from_iov(&fbuf, 1, out_sg + 1);
- fbuf.mem -= out_sg->iov_len;
- fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
-
- /* Allocate the bufv, with space for the rest of the iov */
- allocated_bufv = true;
- pbufv = malloc(sizeof(struct fuse_bufvec) +
- sizeof(struct fuse_buf) * (out_num - 2));
- if (!pbufv) {
- vu_queue_unpop(dev, q, elem, 0);
- free(elem);
- fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
- __func__);
- goto out;
- }
-
- pbufv->count = 1;
- pbufv->buf[0] = fbuf;
-
- size_t iovindex, pbufvindex;
- iovindex = 2; /* 2 headers, separate iovs */
- pbufvindex = 1; /* 2 headers, 1 fusebuf */
-
- for (; iovindex < out_num; iovindex++, pbufvindex++) {
- pbufv->count++;
- pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
- pbufv->buf[pbufvindex].flags = 0;
- pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
- pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
- }
- } else {
- /* Normal (non fast write) path */
-
- /* Copy the rest of the buffer */
- fbuf.mem += out_sg->iov_len;
- copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
- fbuf.mem -= out_sg->iov_len;
- fbuf.size = out_len;
-
- /* TODO! Endianness of header */
-
- /* TODO: Add checks for fuse_session_exited */
- bufv.buf[0] = fbuf;
- bufv.count = 1;
- pbufv = &bufv;
- }
- pbufv->idx = 0;
- pbufv->off = 0;
- fuse_session_process_buf_int(se, pbufv, &ch);
-
- if (allocated_bufv) {
- free(pbufv);
- }
-
- if (!qi->reply_sent) {
- fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
- __func__, elem->index);
- /* I think we've still got to recycle the element */
- vu_queue_push(dev, q, elem, 0);
- vu_queue_notify(dev, q);
- }
- qi->qe = NULL;
- free(elem);
- elem = NULL;
+ g_thread_pool_push(pool, req, NULL);
}
+ pthread_mutex_unlock(&qi->vq_lock);
pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
}
-out:
- pthread_mutex_destroy(&ch.lock);
- free(fbuf.mem);
+
+ g_thread_pool_free(pool, FALSE, TRUE);
return NULL;
}
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
__func__, qidx, ret);
}
+ pthread_mutex_destroy(&ourqi->vq_lock);
close(ourqi->kill_fd);
ourqi->kick_fd = -1;
free(vud->qi[qidx]);
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
assert(ourqi->kill_fd != -1);
+ pthread_mutex_init(&ourqi->vq_lock, NULL);
+
if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
__func__, qidx);