isync

mailbox synchronization program
git clone https://git.code.sf.net/p/isync/isync
Log | Files | Refs | README | LICENSE

commit 4423a932f3e280a6aa8afd2ae611c9064e840dae
parent be657530ee2fda8605eebf5c049f7eb7af078baf
Author: Oswald Buddenhagen <ossi@users.sf.net>
Date:   Mon, 14 Dec 2020 23:16:01 +0100

add forced async mode to proxy driver

to test async operation of the syncing core while using the synchronous
maildir driver, we add a mode to the proxy driver where it queues
callback invocations to the next main loop iteration.

Diffstat:
Msrc/common.h | 1+
Msrc/driver.h | 7+++++++
Msrc/drv_imap.c | 2+-
Msrc/drv_proxy.c | 147++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Msrc/drv_proxy_gen.pl | 25++++++++++++++++++++-----
Msrc/main.c | 20+++++++++++++-------
Msrc/run-tests.pl | 42++++++++++++++++++++++++++----------------
7 files changed, 196 insertions(+), 48 deletions(-)

diff --git a/src/common.h b/src/common.h @@ -107,6 +107,7 @@ typedef unsigned long ulong; #define VERBOSE 0x800 #define KEEPJOURNAL 0x1000 #define ZERODELAY 0x2000 +#define FORCEASYNC 0x4000 extern int DFlags; extern int JLimit; diff --git a/src/driver.h b/src/driver.h @@ -127,6 +127,9 @@ typedef struct { #define DRV_CANCELED 4 /* All memory belongs to the driver's user, unless stated otherwise. */ +// If the driver is NOT DRV_ASYNC, memory owned by the driver returned +// through callbacks MUST remain valid until a related subsequent command +// is invoked, as the proxy driver may deliver these pointers with delay. /* This flag says that the driver CAN store messages with CRLFs, @@ -138,6 +141,10 @@ typedef struct { This flag says that the driver will act upon (DFlags & VERBOSE). */ #define DRV_VERBOSE 2 +/* + This flag says that the driver operates asynchronously. +*/ +#define DRV_ASYNC 4 #define LIST_INBOX 1 #define LIST_PATH 2 diff --git a/src/drv_imap.c b/src/drv_imap.c @@ -3675,7 +3675,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep ) static uint imap_get_caps( store_t *gctx ATTR_UNUSED ) { - return DRV_CRLF | DRV_VERBOSE; + return DRV_CRLF | DRV_VERBOSE | DRV_ASYNC; } struct driver imap_driver = { diff --git a/src/drv_proxy.c b/src/drv_proxy.c @@ -21,9 +21,12 @@ #include "driver.h" +#include <assert.h> #include <limits.h> #include <stdlib.h> +typedef struct gen_cmd gen_cmd_t; + typedef union proxy_store { store_t gen; struct { @@ -32,6 +35,9 @@ typedef union proxy_store { uint ref_count; driver_t *real_driver; store_t *real_store; + gen_cmd_t *done_cmds, **done_cmds_append; + gen_cmd_t *check_cmds, **check_cmds_append; + wakeup_t wakeup; void (*bad_callback)( void *aux ); void *bad_callback_aux; @@ -78,8 +84,10 @@ proxy_make_flags( uchar flags, char *buf ) static void proxy_store_deref( proxy_store_t *ctx ) { - if (!--ctx->ref_count) + if (!--ctx->ref_count) { + assert( !pending_wakeup( &ctx->wakeup ) ); free( ctx ); + } } static int curr_tag; @@ -87,11 +95,24 @@ static int curr_tag; #define GEN_CMD \ uint ref_count; \ int tag; \ - proxy_store_t *ctx; + proxy_store_t *ctx; \ + gen_cmd_t *next; \ + void (*queued_cb)( gen_cmd_t *gcmd ); -typedef struct { +struct gen_cmd { GEN_CMD -} gen_cmd_t; +}; + +#define GEN_STS_CMD \ + GEN_CMD \ + int sts; + +typedef union { + gen_cmd_t gen; + struct { + GEN_STS_CMD + }; +} gen_sts_cmd_t; static gen_cmd_t * proxy_cmd_new( proxy_store_t *ctx, uint sz ) @@ -113,6 +134,67 @@ proxy_cmd_done( gen_cmd_t *cmd ) } } +static void +proxy_wakeup( void *aux ) +{ + proxy_store_t *ctx = (proxy_store_t *)aux; + + gen_cmd_t *cmd = ctx->done_cmds; + assert( cmd ); + if (!(ctx->done_cmds = cmd->next)) + ctx->done_cmds_append = &ctx->done_cmds; + else + conf_wakeup( &ctx->wakeup, 0 ); + cmd->queued_cb( cmd ); + proxy_cmd_done( cmd ); +} + +static void +proxy_invoke_cb( gen_cmd_t *cmd, void (*cb)( gen_cmd_t * ), int checked, const char *name ) +{ + if (DFlags & FORCEASYNC) { + debug( "%s[% 2d] Callback queue %s%s\n", cmd->ctx->label, cmd->tag, name, checked ? " (checked)" : "" ); + cmd->queued_cb = cb; + cmd->next = NULL; + if (checked) { + *cmd->ctx->check_cmds_append = cmd; + cmd->ctx->check_cmds_append = &cmd->next; + } else { + *cmd->ctx->done_cmds_append = cmd; + cmd->ctx->done_cmds_append = &cmd->next; + conf_wakeup( &cmd->ctx->wakeup, 0 ); + } + } else { + cb( cmd ); + proxy_cmd_done( cmd ); + } +} + +static void +proxy_flush_checked_cmds( proxy_store_t *ctx ) +{ + if (ctx->check_cmds) { + *ctx->done_cmds_append = ctx->check_cmds; + ctx->done_cmds_append = ctx->check_cmds_append; + ctx->check_cmds_append = &ctx->check_cmds; + ctx->check_cmds = NULL; + conf_wakeup( &ctx->wakeup, 0 ); + } +} + +static void +proxy_cancel_checked_cmds( proxy_store_t *ctx ) +{ + gen_cmd_t *cmd; + + while ((cmd = ctx->check_cmds)) { + if (!(ctx->check_cmds = cmd->next)) + ctx->check_cmds_append = &ctx->check_cmds; + ((gen_sts_cmd_t *)cmd)->sts = DRV_CANCELED; + cmd->queued_cb( cmd ); + } +} + #if 0 //# TEMPLATE GETTER static @type@proxy_@name@( store_t *gctx ) @@ -155,9 +237,10 @@ static @type@proxy_@name@( store_t *gctx@decl_args@ ) //# TEMPLATE CALLBACK typedef union { - gen_cmd_t gen; + @gen_cmd_t@ gen; struct { - GEN_CMD + @GEN_CMD@ + @decl_cb_state@ void (*callback)( @decl_cb_args@void *aux ); void *callback_aux; @decl_state@ @@ -165,16 +248,24 @@ typedef union { } @name@_cmd_t; static void -proxy_@name@_cb( @decl_cb_args@void *aux ) +proxy_do_@name@_cb( gen_cmd_t *gcmd ) { - @name@_cmd_t *cmd = (@name@_cmd_t *)aux; + @name@_cmd_t *cmd = (@name@_cmd_t *)gcmd; @pre_print_cb_args@ debug( "%s[% 2d] Callback enter @name@@print_fmt_cb_args@\n", cmd->ctx->label, cmd->tag@print_pass_cb_args@ ); @print_cb_args@ cmd->callback( @pass_cb_args@cmd->callback_aux ); debug( "%s[% 2d] Callback leave @name@\n", cmd->ctx->label, cmd->tag ); - proxy_cmd_done( &cmd->gen ); +} + +static void +proxy_@name@_cb( @decl_cb_args@void *aux ) +{ + @name@_cmd_t *cmd = (@name@_cmd_t *)aux; + + @save_cb_args@ + proxy_invoke_cb( @gen_cmd@, proxy_do_@name@_cb, @checked@, "@name@" ); } static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( @decl_cb_args@void *aux ), void *aux ) @@ -190,15 +281,15 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( @decl_cb_args@v @print_args@ ctx->real_driver->@name@( ctx->real_store@pass_args@, proxy_@name@_cb, cmd ); debug( "%s[% 2d] Leave @name@\n", ctx->label, cmd->tag ); - proxy_cmd_done( &cmd->gen ); + proxy_cmd_done( @gen_cmd@ ); } //# END //# UNDEFINE list_store_print_fmt_cb_args //# UNDEFINE list_store_print_pass_cb_args //# DEFINE list_store_print_cb_args - if (sts == DRV_OK) { - for (string_list_t *box = boxes; box; box = box->next) + if (cmd->sts == DRV_OK) { + for (string_list_t *box = cmd->boxes; box; box = box->next) debug( " %s\n", box->string ); } //# END @@ -217,21 +308,21 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( @decl_cb_args@v } //# END //# DEFINE load_box_print_fmt_cb_args , sts=%d, total=%d, recent=%d -//# DEFINE load_box_print_pass_cb_args , sts, total_msgs, recent_msgs +//# DEFINE load_box_print_pass_cb_args , cmd->sts, cmd->total_msgs, cmd->recent_msgs //# DEFINE load_box_print_cb_args - if (sts == DRV_OK) { + if (cmd->sts == DRV_OK) { static char fbuf[as(Flags) + 1]; - for (message_t *msg = msgs; msg; msg = msg->next) + for (message_t *msg = cmd->msgs; msg; msg = msg->next) debug( " uid=%-5u flags=%-4s size=%-6u tuid=%." stringify(TUIDL) "s\n", msg->uid, (msg->status & M_FLAGS) ? (proxy_make_flags( msg->flags, fbuf ), fbuf) : "?", msg->size, *msg->tuid ? msg->tuid : "?" ); } //# END //# DEFINE find_new_msgs_print_fmt_cb_args , sts=%d -//# DEFINE find_new_msgs_print_pass_cb_args , sts +//# DEFINE find_new_msgs_print_pass_cb_args , cmd->sts //# DEFINE find_new_msgs_print_cb_args - if (sts == DRV_OK) { - for (message_t *msg = msgs; msg; msg = msg->next) + if (cmd->sts == DRV_OK) { + for (message_t *msg = cmd->msgs; msg; msg = msg->next) debug( " uid=%-5u tuid=%." stringify(TUIDL) "s\n", msg->uid, msg->tuid ); } //# END @@ -251,7 +342,7 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( @decl_cb_args@v //# DEFINE fetch_msg_print_fmt_cb_args , flags=%s, date=%lld, size=%u //# DEFINE fetch_msg_print_pass_cb_args , fbuf, (long long)cmd->data->date, cmd->data->len //# DEFINE fetch_msg_print_cb_args - if (sts == DRV_OK && (DFlags & DEBUG_DRV_ALL)) { + if (cmd->sts == DRV_OK && (DFlags & DEBUG_DRV_ALL)) { printf( "%s=========\n", cmd->ctx->label ); fwrite( cmd->data->data, cmd->data->len, 1, stdout ); printf( "%s=========\n", cmd->ctx->label ); @@ -281,14 +372,29 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( @decl_cb_args@v //# END //# DEFINE set_msg_flags_print_fmt_args , uid=%u, add=%s, del=%s //# DEFINE set_msg_flags_print_pass_args , uid, fbuf1, fbuf2 +//# DEFINE set_msg_flags_checked sts == DRV_OK //# DEFINE trash_msg_print_fmt_args , uid=%u //# DEFINE trash_msg_print_pass_args , msg->uid +//# DEFINE commit_cmds_print_args + proxy_flush_checked_cmds( ctx ); +//# END + +//# DEFINE cancel_cmds_print_cb_args + proxy_cancel_checked_cmds( cmd->ctx ); +//# END + +//# DEFINE free_store_print_args + proxy_cancel_checked_cmds( ctx ); +//# END //# DEFINE free_store_action proxy_store_deref( ctx ); //# END +//# DEFINE cancel_store_print_args + proxy_cancel_checked_cmds( ctx ); +//# END //# DEFINE cancel_store_action proxy_store_deref( ctx ); //# END @@ -325,9 +431,12 @@ proxy_alloc_store( store_t *real_ctx, const char *label ) ctx->gen.conf = real_ctx->conf; ctx->ref_count = 1; ctx->label = label; + ctx->done_cmds_append = &ctx->done_cmds; + ctx->check_cmds_append = &ctx->check_cmds; ctx->real_driver = real_ctx->driver; ctx->real_store = real_ctx; ctx->real_driver->set_bad_callback( ctx->real_store, (void (*)(void *))proxy_invoke_bad_callback, ctx ); + init_wakeup( &ctx->wakeup, proxy_wakeup, ctx ); return &ctx->gen; } diff --git a/src/drv_proxy_gen.pl b/src/drv_proxy_gen.pl @@ -150,11 +150,26 @@ for (@ptypes) { } else { if ($cmd_type eq "void " && $cmd_args =~ s/, void \(\*cb\)\( (.*)void \*aux \), void \*aux$//) { my $cmd_cb_args = $1; - $replace{'decl_cb_args'} = $cmd_cb_args; - $replace{'pass_cb_args'} = make_args($cmd_cb_args); - my $cmd_print_cb_args = $cmd_cb_args =~ s/(.*), $/, $1/r; - $replace{'print_pass_cb_args'} = make_args($cmd_print_cb_args); - $replace{'print_fmt_cb_args'} = make_format($cmd_print_cb_args); + if (length($cmd_cb_args)) { + $replace{'decl_cb_args'} = $cmd_cb_args; + my $r_cmd_cb_args = $cmd_cb_args; + $r_cmd_cb_args =~ s/^int sts, // or die("Callback arguments of $cmd_name don't start with sts.\n"); + $replace{'decl_cb_state'} = $r_cmd_cb_args =~ s/, /\;\n/gr; + my $pass_cb_args = make_args($cmd_cb_args); + $replace{'save_cb_args'} = $pass_cb_args =~ s/([^,]+), /cmd->$1 = $1\;\n/gr; + $pass_cb_args =~ s/([^, ]+)/cmd->$1/g; + $replace{'pass_cb_args'} = $pass_cb_args; + $replace{'print_pass_cb_args'} = $pass_cb_args =~ s/(.*), $/, $1/r; + $replace{'print_fmt_cb_args'} = make_format($cmd_cb_args =~ s/(.*), $/, $1/r); + $replace{'gen_cmd_t'} = "gen_sts_cmd_t"; + $replace{'GEN_CMD'} = "GEN_STS_CMD\n"; + $replace{'gen_cmd'} = "&cmd->gen.gen"; + } else { + $replace{'gen_cmd_t'} = "gen_cmd_t"; + $replace{'GEN_CMD'} = "GEN_CMD\n"; + $replace{'gen_cmd'} = "&cmd->gen"; + } + $replace{'checked'} //= '0'; $template = "CALLBACK"; } elsif ($cmd_type eq "void ") { $template = "REGULAR_VOID"; diff --git a/src/main.c b/src/main.c @@ -713,6 +713,9 @@ main( int argc, char **argv ) case 'T': for (; *ochar; ) { switch (*ochar++) { + case 'a': + DFlags |= FORCEASYNC; + break; case 'j': DFlags |= KEEPJOURNAL; JLimit = strtol( ochar, &ochar, 10 ); @@ -861,20 +864,23 @@ sync_chans( main_vars_t *mvars, int ent ) if (mvars->skip) goto next2; mvars->state[F] = mvars->state[N] = ST_FRESH; - if ((DFlags & DEBUG_DRV) || (mvars->chan->stores[F]->driver->get_caps( NULL ) & mvars->chan->stores[N]->driver->get_caps( NULL ) & DRV_VERBOSE)) + uint dcaps[2]; + for (t = 0; t < 2; t++) { + mvars->drv[t] = mvars->chan->stores[t]->driver; + dcaps[t] = mvars->drv[t]->get_caps( NULL ); + } + if ((DFlags & DEBUG_DRV) || (dcaps[F] & dcaps[N] & DRV_VERBOSE)) labels[F] = "F: ", labels[N] = "N: "; else labels[F] = labels[N] = ""; for (t = 0; t < 2; t++) { - driver_t *drv = mvars->chan->stores[t]->driver; - store_t *ctx = drv->alloc_store( mvars->chan->stores[t], labels[t] ); - if (DFlags & DEBUG_DRV) { - drv = &proxy_driver; + store_t *ctx = mvars->drv[t]->alloc_store( mvars->chan->stores[t], labels[t] ); + if ((DFlags & DEBUG_DRV) || ((DFlags & FORCEASYNC) && !(dcaps[t] & DRV_ASYNC))) { + mvars->drv[t] = &proxy_driver; ctx = proxy_alloc_store( ctx, labels[t] ); } - mvars->drv[t] = drv; mvars->ctx[t] = ctx; - drv->set_bad_callback( ctx, store_bad, AUX ); + mvars->drv[t]->set_bad_callback( ctx, store_bad, AUX ); } for (t = 0; ; t++) { info( "Opening %s store %s...\n", str_fn[t], mvars->chan->stores[t]->name ); diff --git a/src/run-tests.pl b/src/run-tests.pl @@ -334,10 +334,10 @@ sub killcfg() unlink ".mbsyncrc"; } -# $options -sub runsync($$) +# $run_async, $mbsync_options, $log_file +sub runsync($$$) { - my ($flags, $file) = @_; + my ($async, $flags, $file) = @_; my $cmd; if ($use_vg) { @@ -345,6 +345,7 @@ sub runsync($$) } else { $flags .= " -D"; } + $flags .= " -Ta" if ($async); $cmd .= "$mbsync -Tz $flags -c .mbsyncrc test"; open FILE, "$cmd 2>&1 |"; my @out = <FILE>; @@ -477,7 +478,7 @@ sub show($$$) showchan("near/.mbsyncstate"); print ");\n"; &writecfg(@sfx); - runsync("", ""); + runsync(0, "", ""); killcfg(); print "my \@X$tx = (\n"; showchan("near/.mbsyncstate"); @@ -681,18 +682,14 @@ sub readfile($) return @nj; } -# $title, \@source_state, \@target_state, @channel_configs -sub test($$$@) +# $run_async, \@source_state, \@target_state, @channel_configs +sub test_impl($$$@) { - my ($ttl, $sx, $tx, @sfx) = @_; - - return 0 if (scalar(@ARGV) && !grep { $_ eq $ttl } @ARGV); - print "Testing: ".$ttl." ...\n"; - &writecfg(@sfx); + my ($async, $sx, $tx, @sfx) = @_; mkchan($$sx[0], $$sx[1], @{ $$sx[2] }); - my ($xc, @ret) = runsync("-Tj", "1-initial.log"); + my ($xc, @ret) = runsync($async, "-Tj", "1-initial.log"); if ($xc || ckchan("near/.mbsyncstate.new", $tx)) { print "Input:\n"; printchan($sx); @@ -710,7 +707,7 @@ sub test($$$@) } my @nj = readfile("near/.mbsyncstate.journal"); - my ($jxc, @jret) = runsync("-0 --no-expunge", "2-replay.log"); + my ($jxc, @jret) = runsync($async, "-0 --no-expunge", "2-replay.log"); if ($jxc || ckstate("near/.mbsyncstate", @{ $$tx[2] })) { print "Journal replay failed.\n"; print "Options:\n"; @@ -729,7 +726,7 @@ sub test($$$@) exit 1; } - my ($ixc, @iret) = runsync("", "3-verify.log"); + my ($ixc, @iret) = runsync($async, "", "3-verify.log"); if ($ixc || ckchan("near/.mbsyncstate", $tx)) { print "Idempotence verification run failed.\n"; print "Input == Expected result:\n"; @@ -752,7 +749,7 @@ sub test($$$@) for (my $l = 1; $l <= $njl; $l++) { mkchan($$sx[0], $$sx[1], @{ $$sx[2] }); - my ($nxc, @nret) = runsync("-Tj$l", "4-interrupt.log"); + my ($nxc, @nret) = runsync($async, "-Tj$l", "4-interrupt.log"); if ($nxc != (100 + ($l & 1)) << 8) { print "Interrupting at step $l/$njl failed.\n"; print "Debug output:\n"; @@ -760,7 +757,7 @@ sub test($$$@) exit 1; } - ($nxc, @nret) = runsync("-Tj", "5-resume.log"); + ($nxc, @nret) = runsync($async, "-Tj", "5-resume.log"); if ($nxc || ckchan("near/.mbsyncstate.new", $tx)) { print "Resuming from step $l/$njl failed.\n"; print "Input:\n"; @@ -785,6 +782,19 @@ sub test($$$@) rmtree "near"; rmtree "far"; } +} + +# $title, \@source_state, \@target_state, @channel_configs +sub test($$$@) +{ + my ($ttl, $sx, $tx, @sfx) = @_; + + return 0 if (scalar(@ARGV) && !grep { $_ eq $ttl } @ARGV); + print "Testing: ".$ttl." ...\n"; + &writecfg(@sfx); + + test_impl(0, $sx, $tx, @sfx); + test_impl(1, $sx, $tx, @sfx); killcfg(); }