wsockd: more support infrastructure for websockets

This commit is contained in:
William Pitcock 2016-04-02 18:14:56 -05:00
parent ec39816b13
commit f297042b0c

View file

@ -276,6 +276,13 @@ conn_mod_write(conn_t * conn, void *data, size_t len)
rb_rawbuf_append(conn->modbuf_out, data, len); rb_rawbuf_append(conn->modbuf_out, data, len);
} }
static void
conn_mod_write_frame(conn_t * conn, void *data, size_t len)
{
if(IsDead(conn)) /* no point in queueing to a dead man */
return;
}
static void static void
conn_plain_write(conn_t * conn, void *data, size_t len) conn_plain_write(conn_t * conn, void *data, size_t len)
{ {
@ -458,7 +465,7 @@ conn_mod_handshake_process(conn_t *conn)
} }
static void static void
conn_mod_handshake_cb(rb_fde_t *fd, void *data) conn_mod_read_cb(rb_fde_t *fd, void *data)
{ {
char inbuf[READBUF_SIZE]; char inbuf[READBUF_SIZE];
conn_t *conn = data; conn_t *conn = data;
@ -479,7 +486,7 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data)
if (length < 0) if (length < 0)
{ {
if (rb_ignore_errno(errno)) if (rb_ignore_errno(errno))
rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn);
else else
close_conn(conn, NO_WAIT, "Connection closed"); close_conn(conn, NO_WAIT, "Connection closed");
@ -492,24 +499,92 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data)
} }
rb_rawbuf_append(conn->modbuf_in, inbuf, length); rb_rawbuf_append(conn->modbuf_in, inbuf, length);
if (!IsKeyed(conn))
conn_mod_handshake_process(conn); conn_mod_handshake_process(conn);
if (length < sizeof(inbuf)) if (length < sizeof(inbuf))
{ {
rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn);
return; return;
} }
} }
} }
static void static bool
conn_mod_read_cb(rb_fde_t *fd, void *data) plain_check_cork(conn_t * conn)
{ {
if(rb_rawbuf_length(conn->modbuf_out) >= 4096)
{
/* if we have over 4k pending outbound, don't read until
* we've cleared the queue */
SetCork(conn);
rb_setselect(conn->plain_fd, RB_SELECT_READ, NULL, NULL);
/* try to write */
conn_mod_write_sendq(conn->mod_fd, conn);
return true;
}
return false;
}
static void
conn_plain_process_recvq(conn_t *conn)
{
char inbuf[READBUF_SIZE];
while (1)
{
size_t dolen = rb_linebuf_get(&conn->plainbuf_in, inbuf, sizeof inbuf, LINEBUF_COMPLETE, LINEBUF_PARSED);
if (!dolen)
break;
conn_mod_write_frame(conn, inbuf, dolen);
}
} }
static void static void
conn_plain_read_cb(rb_fde_t *fd, void *data) conn_plain_read_cb(rb_fde_t *fd, void *data)
{ {
char inbuf[READBUF_SIZE];
conn_t *conn = data;
int length = 0;
if(conn == NULL)
return;
if(IsDead(conn))
return;
if(plain_check_cork(conn))
return;
while(1)
{
if(IsDead(conn))
return;
length = rb_read(conn->plain_fd, inbuf, sizeof(inbuf));
if(length == 0 || (length < 0 && !rb_ignore_errno(errno)))
{
close_conn(conn, NO_WAIT, NULL);
return;
}
if(length < 0)
{
rb_setselect(conn->plain_fd, RB_SELECT_READ, conn_plain_read_cb, conn);
conn_plain_process_recvq(conn);
return;
}
conn->plain_in += length;
(void) rb_linebuf_parse(&conn->plainbuf_in, inbuf, sizeof(inbuf), 0);
if(IsDead(conn))
return;
if(plain_check_cork(conn))
return;
}
} }
static void static void
@ -559,7 +634,7 @@ wsock_process(mod_ctl_t * ctl, mod_ctl_buf_t * ctlb)
if(rb_get_type(conn->plain_fd) == RB_FD_UNKNOWN) if(rb_get_type(conn->plain_fd) == RB_FD_UNKNOWN)
rb_set_type(conn->plain_fd, RB_FD_SOCKET); rb_set_type(conn->plain_fd, RB_FD_SOCKET);
conn_mod_handshake_cb(conn->mod_fd, conn); conn_mod_read_cb(conn->mod_fd, conn);
} }
static void static void