commit 139b90be292c679141ecaf5fae09705a577081d8
parent f0b80e7d35f8836abe3d347a9d82ad4691614856
Author: Oswald Buddenhagen <ossi@users.sf.net>
Date: Sat, 13 Dec 2014 12:09:15 +0100
added support for IMAP DEFLATE
initial patch by Jesse Weaver <pianohacker@gmail.com>, but mostly
rewritten by me.
Diffstat:
6 files changed, 279 insertions(+), 27 deletions(-)
diff --git a/NEWS b/NEWS
@@ -10,6 +10,8 @@ Support for SASL (flexible authentication) has been added.
Support for Windows file systems has been added.
+Support for compressed data transfer has been added.
+
[1.1.0]
Support for hierarchical mailboxes in Patterns.
diff --git a/configure.ac b/configure.ac
@@ -150,6 +150,15 @@ if test "x$ac_cv_berkdb4" = xno; then
AC_MSG_ERROR([Berkley DB >= 4.2 not found.])
fi
+have_zlib=
+AC_CHECK_LIB([z], [deflate],
+ [AC_CHECK_HEADER(zlib.h,
+ [have_zlib=1
+ AC_SUBST([Z_LIBS], ["-lz"])
+ AC_DEFINE([HAVE_LIBZ], 1, [if you have the zlib library])]
+ )]
+)
+
AC_ARG_ENABLE(compat,
AC_HELP_STRING([--disable-compat], [don't include isync compatibility wrapper [no]]),
[ob_cv_enable_compat=$enableval])
@@ -172,4 +181,9 @@ if test -n "$have_sasl_paths"; then
else
AC_MSG_RESULT([Not using SASL])
fi
+if test -n "$have_zlib"; then
+ AC_MSG_RESULT([Using zlib])
+else
+ AC_MSG_RESULT([Not using zlib])
+fi
AC_MSG_RESULT()
diff --git a/src/Makefile.am b/src/Makefile.am
@@ -6,7 +6,7 @@ SUBDIRS = $(compat_dir)
bin_PROGRAMS = mbsync mdconvert
mbsync_SOURCES = main.c sync.c config.c util.c socket.c driver.c drv_imap.c drv_maildir.c
-mbsync_LDADD = -ldb $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS)
+mbsync_LDADD = -ldb $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) $(Z_LIBS)
noinst_HEADERS = common.h config.h driver.h sync.h socket.h
mdconvert_SOURCES = mdconvert.c
diff --git a/src/drv_imap.c b/src/drv_imap.c
@@ -196,7 +196,8 @@ enum CAPABILITY {
UIDPLUS,
LITERALPLUS,
MOVE,
- NAMESPACE
+ NAMESPACE,
+ COMPRESS_DEFLATE
};
static const char *cap_list[] = {
@@ -210,7 +211,8 @@ static const char *cap_list[] = {
"UIDPLUS",
"LITERAL+",
"MOVE",
- "NAMESPACE"
+ "NAMESPACE",
+ "COMPRESS=DEFLATE"
};
#define RESP_OK 0
@@ -1486,6 +1488,9 @@ static void imap_open_store_authenticate2_p2( imap_store_t *, struct imap_cmd *,
static void imap_open_store_namespace( imap_store_t * );
static void imap_open_store_namespace_p2( imap_store_t *, struct imap_cmd *, int );
static void imap_open_store_namespace2( imap_store_t * );
+#ifdef HAVE_LIBZ
+static void imap_open_store_compress_p2( imap_store_t *, struct imap_cmd *, int );
+#endif
static void imap_open_store_finalize( imap_store_t * );
#ifdef HAVE_LIBSSL
static void imap_open_store_ssl_bail( imap_store_t * );
@@ -2041,12 +2046,32 @@ imap_open_store_namespace2( imap_store_t *ctx )
ctx->prefix = nsp_1st_ns->val;
if (!ctx->delimiter)
ctx->delimiter = nfstrdup( nsp_1st_dl->val );
+#ifdef HAVE_LIBZ
+ if (CAP(COMPRESS_DEFLATE)) { /* XXX make that configurable */
+ imap_exec( ctx, 0, imap_open_store_compress_p2, "COMPRESS DEFLATE" );
+ return;
+ }
+#endif
imap_open_store_finalize( ctx );
} else {
imap_open_store_bail( ctx );
}
}
+#ifdef HAVE_LIBZ
+static void
+imap_open_store_compress_p2( imap_store_t *ctx, struct imap_cmd *cmd ATTR_UNUSED, int response )
+{
+ if (response == RESP_NO) {
+ /* We already reported an error, but it's not fatal to us. */
+ imap_open_store_finalize( ctx );
+ } else if (response == RESP_OK) {
+ socket_start_deflate( &ctx->conn );
+ imap_open_store_finalize( ctx );
+ }
+}
+#endif
+
static void
imap_open_store_finalize( imap_store_t *ctx )
{
diff --git a/src/socket.c b/src/socket.c
@@ -280,6 +280,43 @@ static void start_tls_p3( conn_t *conn, int ok )
#endif /* HAVE_LIBSSL */
+#ifdef HAVE_LIBZ
+
+static void z_fake_cb( void * );
+
+void
+socket_start_deflate( conn_t *conn )
+{
+ int result;
+
+ conn->in_z = nfcalloc( sizeof(*conn->in_z) );
+ result = inflateInit2(
+ conn->in_z,
+ -15 /* Use raw deflate */
+ );
+ if (result != Z_OK) {
+ error( "Fatal: Cannot initialize decompression: %s\n", conn->in_z->msg );
+ abort();
+ }
+
+ conn->out_z = nfcalloc( sizeof(*conn->out_z) );
+ result = deflateInit2(
+ conn->out_z,
+ Z_DEFAULT_COMPRESSION, /* Compression level */
+ Z_DEFLATED, /* Only valid value */
+ -15, /* Use raw deflate */
+ 8, /* Default memory usage */
+ Z_DEFAULT_STRATEGY /* Don't try to do anything fancy */
+ );
+ if (result != Z_OK) {
+ error( "Fatal: Cannot initialize compression: %s\n", conn->out_z->msg );
+ abort();
+ }
+
+ init_wakeup( &conn->z_fake, z_fake_cb, conn );
+}
+#endif /* HAVE_LIBZ */
+
static void socket_fd_cb( int, void * );
static void socket_fake_cb( void * );
@@ -501,29 +538,47 @@ socket_close( conn_t *sock )
wipe_wakeup( &sock->ssl_fake );
}
#endif
+#ifdef HAVE_LIBZ
+ if (sock->in_z) {
+ inflateEnd( sock->in_z );
+ free( sock->in_z );
+ sock->in_z = 0;
+ deflateEnd( sock->out_z );
+ free( sock->out_z );
+ sock->out_z = 0;
+ wipe_wakeup( &sock->z_fake );
+ }
+#endif
while (sock->write_buf)
dispose_chunk( sock );
free( sock->append_buf );
sock->append_buf = 0;
}
-static void
-socket_fill( conn_t *sock )
+static int
+prepare_read( conn_t *sock, char **buf, int *len )
{
- char *buf;
int n = sock->offset + sock->bytes;
- int len = sizeof(sock->buf) - n;
- if (!len) {
+ if (!(*len = sizeof(sock->buf) - n)) {
error( "Socket error: receive buffer full. Probably protocol error.\n" );
socket_fail( sock );
- return;
+ return -1;
}
+ *buf = sock->buf + n;
+ return 0;
+}
+
+static int
+do_read( conn_t *sock, char *buf, int len )
+{
+ int n;
+
assert( sock->fd >= 0 );
- buf = sock->buf + n;
#ifdef HAVE_LIBSSL
if (sock->ssl) {
if ((n = ssl_return( "read from", sock, SSL_read( sock->ssl, buf, len ) )) <= 0)
- return;
+ return n;
+
if (n == len && SSL_pending( sock->ssl ))
conf_wakeup( &sock->ssl_fake, 0 );
} else
@@ -532,15 +587,71 @@ socket_fill( conn_t *sock )
if ((n = read( sock->fd, buf, len )) < 0) {
sys_error( "Socket error: read from %s", sock->name );
socket_fail( sock );
- return;
} else if (!n) {
error( "Socket error: read from %s: unexpected EOF\n", sock->name );
socket_fail( sock );
- return;
+ return -1;
}
}
- sock->bytes += n;
- sock->read_callback( sock->callback_aux );
+
+ return n;
+}
+
+#ifdef HAVE_LIBZ
+static void
+socket_fill_z( conn_t *sock )
+{
+ char *buf;
+ int len;
+
+ if (prepare_read( sock, &buf, &len ) < 0)
+ return;
+
+ sock->in_z->avail_out = len;
+ sock->in_z->next_out = (unsigned char *)buf;
+
+ if (inflate( sock->in_z, Z_SYNC_FLUSH ) != Z_OK) {
+ error( "Error decompressing data from %s: %s\n", sock->name, sock->in_z->msg );
+ socket_fail( sock );
+ return;
+ }
+
+ if (!sock->in_z->avail_out)
+ conf_wakeup( &sock->z_fake, 0 );
+
+ if ((len = (char *)sock->in_z->next_out - buf)) {
+ sock->bytes += len;
+ sock->read_callback( sock->callback_aux );
+ }
+}
+#endif
+
+static void
+socket_fill( conn_t *sock )
+{
+#ifdef HAVE_LIBZ
+ if (sock->in_z) {
+ /* The timer will preempt reads until the buffer is empty. */
+ assert( !sock->in_z->avail_in );
+ sock->in_z->next_in = (uchar *)sock->z_buf;
+ if ((sock->in_z->avail_in = do_read( sock, sock->z_buf, sizeof(sock->z_buf) )) <= 0)
+ return;
+ socket_fill_z( sock );
+ } else
+#endif
+ {
+ char *buf;
+ int len;
+
+ if (prepare_read( sock, &buf, &len ) < 0)
+ return;
+
+ if ((len = do_read( sock, buf, len )) <= 0)
+ return;
+
+ sock->bytes += len;
+ sock->read_callback( sock->callback_aux );
+ }
}
int
@@ -655,6 +766,49 @@ do_append( conn_t *conn, buff_chunk_t *bc )
* sufficiently small to keep SSL latency low with a slow uplink. */
#define WRITE_CHUNK_SIZE 1024
+static void
+do_flush( conn_t *conn )
+{
+ buff_chunk_t *bc = conn->append_buf;
+#ifdef HAVE_LIBZ
+ if (conn->out_z) {
+ int buf_avail = conn->append_avail;
+ do {
+ if (!bc) {
+ buf_avail = WRITE_CHUNK_SIZE;
+ bc = nfmalloc( offsetof(buff_chunk_t, data) + buf_avail );
+ bc->len = 0;
+ }
+ conn->out_z->next_in = Z_NULL;
+ conn->out_z->avail_in = 0;
+ conn->out_z->next_out = (uchar *)bc->data + bc->len;
+ conn->out_z->avail_out = buf_avail;
+ if (deflate( conn->out_z, Z_PARTIAL_FLUSH ) != Z_OK) {
+ error( "Fatal: Compression error: %s\n", conn->out_z->msg );
+ abort();
+ }
+ bc->len = (char *)conn->out_z->next_out - bc->data;
+ if (bc->len) {
+ do_append( conn, bc );
+ bc = 0;
+ buf_avail = 0;
+ } else {
+ buf_avail = conn->out_z->avail_out;
+ }
+ } while (!conn->out_z->avail_out);
+ conn->append_buf = bc;
+ conn->append_avail = buf_avail;
+ } else
+#endif
+ if (bc) {
+ do_append( conn, bc );
+ conn->append_buf = 0;
+#ifdef HAVE_LIBZ
+ conn->append_avail = 0;
+#endif
+ }
+}
+
int
socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
{
@@ -663,29 +817,54 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
for (i = 0; i < iovcnt; i++)
total += iov[i].len;
- bc = conn->append_buf;
- if (bc && total >= WRITE_CHUNK_SIZE) {
+ if (total >= WRITE_CHUNK_SIZE) {
/* If the new data is too big, queue the pending buffer to avoid latency. */
- do_append( conn, bc );
- bc = 0;
+ do_flush( conn );
}
+ bc = conn->append_buf;
+#ifdef HAVE_LIBZ
+ buf_avail = conn->append_avail;
+#endif
while (total) {
if (!bc) {
+ /* We don't do anything special when compressing, as there is no way to
+ * predict a reasonable output buffer size anyway - deflatePending() does
+ * not account for consumed but not yet compressed input, and adding up
+ * the deflateBound()s would be a tad *too* pessimistic. */
buf_avail = total > WRITE_CHUNK_SIZE ? total : WRITE_CHUNK_SIZE;
bc = nfmalloc( offsetof(buff_chunk_t, data) + buf_avail );
bc->len = 0;
+#ifndef HAVE_LIBZ
} else {
/* A pending buffer will always be of standard size - over-sized
* buffers are immediately filled and queued. */
buf_avail = WRITE_CHUNK_SIZE - bc->len;
+#endif
}
while (total) {
len = iov->len - offset;
- if (len > buf_avail)
- len = buf_avail;
- memcpy( bc->data + bc->len, iov->buf + offset, len );
- bc->len += len;
- buf_avail -= len;
+#ifdef HAVE_LIBZ
+ if (conn->out_z) {
+ conn->out_z->next_in = (uchar *)iov->buf + offset;
+ conn->out_z->avail_in = len;
+ conn->out_z->next_out = (uchar *)bc->data + bc->len;
+ conn->out_z->avail_out = buf_avail;
+ if (deflate( conn->out_z, Z_NO_FLUSH ) != Z_OK) {
+ error( "Fatal: Compression error: %s\n", conn->out_z->msg );
+ abort();
+ }
+ bc->len = (char *)conn->out_z->next_out - bc->data;
+ buf_avail = conn->out_z->avail_out;
+ len -= conn->out_z->avail_in;
+ } else
+#endif
+ {
+ if (len > buf_avail)
+ len = buf_avail;
+ memcpy( bc->data + bc->len, iov->buf + offset, len );
+ bc->len += len;
+ buf_avail -= len;
+ }
offset += len;
total -= len;
if (offset == iov->len) {
@@ -702,8 +881,16 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
}
}
conn->append_buf = bc;
+#ifdef HAVE_LIBZ
+ conn->append_avail = buf_avail;
+#endif
/* Queue the pending write once the main loop goes idle. */
- conf_wakeup( &conn->fd_fake, bc ? 0 : -1 );
+ conf_wakeup( &conn->fd_fake,
+#ifdef HAVE_LIBZ
+ /* Always give zlib a chance to flush its internal buffer. */
+ conn->out_z ||
+#endif
+ bc ? 0 : -1 );
/* If no writes were queued before, ensure that flushing commences. */
if (!exwb)
return do_queued_write( conn );
@@ -763,13 +950,22 @@ socket_fake_cb( void *aux )
conn_t *conn = (conn_t *)aux;
buff_chunk_t *exwb = conn->write_buf;
- do_append( conn, conn->append_buf );
- conn->append_buf = 0;
+ do_flush( conn );
/* If no writes were queued before, ensure that flushing commences. */
if (!exwb)
do_queued_write( conn );
}
+#ifdef HAVE_LIBZ
+static void
+z_fake_cb( void *aux )
+{
+ conn_t *conn = (conn_t *)aux;
+
+ socket_fill_z( conn );
+}
+#endif
+
#ifdef HAVE_LIBSSL
static void
ssl_fake_cb( void *aux )
diff --git a/src/socket.h b/src/socket.h
@@ -25,6 +25,10 @@
#include "common.h"
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
#ifdef HAVE_LIBSSL
typedef struct ssl_st SSL;
typedef struct ssl_ctx_st SSL_CTX;
@@ -76,6 +80,10 @@ typedef struct {
SSL *ssl;
wakeup_t ssl_fake;
#endif
+#ifdef HAVE_LIBZ
+ z_streamp in_z, out_z;
+ wakeup_t z_fake;
+#endif
void (*bad_callback)( void *aux ); /* async fail while sending or listening */
void (*read_callback)( void *aux ); /* data available for reading */
@@ -92,6 +100,9 @@ typedef struct {
/* writing */
buff_chunk_t *append_buf; /* accumulating buffer */
buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+#ifdef HAVE_LIBZ
+ int append_avail; /* space left in accumulating buffer */
+#endif
int write_offset; /* offset into buffer head */
/* reading */
@@ -99,6 +110,9 @@ typedef struct {
int bytes; /* number of filled bytes in buffer */
int scanoff; /* offset to continue scanning for newline at, relative to 'offset' */
char buf[100000];
+#ifdef HAVE_LIBZ
+ char z_buf[100000];
+#endif
} conn_t;
/* call this before doing anything with the socket */
@@ -120,6 +134,7 @@ static INLINE void socket_init( conn_t *conn,
}
void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) );
void socket_start_tls(conn_t *conn, void (*cb)( int ok, void *aux ) );
+void socket_start_deflate( conn_t *conn );
void socket_close( conn_t *sock );
int socket_read( conn_t *sock, char *buf, int len ); /* never waits */
char *socket_read_line( conn_t *sock ); /* don't free return value; never waits */