commit 1ba0cd7b96de08b843c77e4b6385bf6d34bd2afa
parent 4b498482883176e724d2fc59eb7d06214aa09360
Author: Oswald Buddenhagen <ossi@users.sf.net>
Date: Tue, 31 May 2022 09:58:41 +0200
factor out sync_state.c & sync_p.h from sync.c
while moving the code, localize some variables, and use C99 comments.
Diffstat:
M | src/Makefile.am | | | 4 | ++-- |
M | src/sync.c | | | 644 | +------------------------------------------------------------------------------ |
A | src/sync_p.h | | | 93 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
A | src/sync_state.c | | | 569 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
4 files changed, 665 insertions(+), 645 deletions(-)
diff --git a/src/Makefile.am b/src/Makefile.am
@@ -7,12 +7,12 @@ mbsync_SOURCES = \
driver.c drv_proxy.c \
drv_imap.c \
drv_maildir.c \
- sync.c \
+ sync.c sync_state.c \
main.c
noinst_HEADERS = \
common.h config.h socket.h \
driver.h \
- sync.h
+ sync.h sync_p.h
mbsync_LDADD = $(DB_LIBS) $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) $(Z_LIBS) $(KEYCHAIN_LIBS)
drv_proxy.$(OBJEXT): drv_proxy.inc
diff --git a/src/sync.c b/src/sync.c
@@ -5,16 +5,7 @@
* mbsync - mailbox synchronizer
*/
-#define DEBUG_FLAG DEBUG_SYNC
-
-#include "sync.h"
-
-#include <fcntl.h>
-#include <ctype.h>
-#include <errno.h>
-#include <sys/stat.h>
-
-#define JOURNAL_VERSION "4"
+#include "sync_p.h"
channel_conf_t global_conf;
channel_conf_t *channels;
@@ -26,75 +17,6 @@ int new_total[2], new_done[2];
int flags_total[2], flags_done[2];
int trash_total[2], trash_done[2];
-const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", "pull" };
-
-
-static uchar
-parse_flags( const char *buf )
-{
- uint i, d;
- uchar flags;
-
- for (flags = i = d = 0; i < as(MsgFlags); i++) {
- if (buf[d] == MsgFlags[i]) {
- flags |= (1 << i);
- d++;
- }
- }
- return flags;
-}
-
-// This is the (mostly) persistent status of the sync record.
-// Most of these bits are actually mutually exclusive. It is a
-// bitfield to allow for easy testing for multiple states.
-#define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled)
-#define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed)
-#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry)
-#define S_DUMMY(fn) (1<<(3+(fn))) // f/n message is only a placeholder
-#define S_SKIPPED (1<<5) // pre-1.4 legacy: the entry was not propagated (message is too big)
-#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored
-
-// Ephemeral working set.
-#define W_NEXPIRE (1<<0) // temporary: new expiration state
-#define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion
-#define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge
-#define W_UPGRADE (1<<4) // ephemeral: upgrading placeholder, do not apply MaxSize
-#define W_PURGE (1<<5) // ephemeral: placeholder is being nuked
-
-typedef struct sync_rec {
- struct sync_rec *next;
- /* string_list_t *keywords; */
- uint uid[2];
- message_t *msg[2];
- uchar status, wstate, flags, pflags, aflags[2], dflags[2];
- char tuid[TUIDL];
-} sync_rec_t;
-
-typedef struct {
- int t[2];
- void (*cb)( int sts, void *aux ), *aux;
- char *dname, *jname, *nname, *lname, *box_name[2];
- FILE *jfp, *nfp;
- sync_rec_t *srecs, **srecadd;
- channel_conf_t *chan;
- store_t *ctx[2];
- driver_t *drv[2];
- const char *orig_name[2];
- message_t *msgs[2], *new_msgs[2];
- uint_array_alloc_t trashed_msgs[2];
- int state[2], lfd, ret, existing, replayed;
- uint ref_count, nsrecs, opts[2];
- uint new_pending[2], flags_pending[2], trash_pending[2];
- uint maxuid[2]; // highest UID that was already propagated
- uint oldmaxuid[2]; // highest UID that was already propagated before this run
- uint uidval[2]; // UID validity value
- uint newuidval[2]; // UID validity obtained from driver
- uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
- uint maxxfuid; // highest expired UID on far side
- uint oldmaxxfuid; // highest expired UID on far side before this run
- uchar good_flags[2], bad_flags[2];
-} sync_vars_t;
-
static void sync_ref( sync_vars_t *svars ) { ++svars->ref_count; }
static void sync_deref( sync_vars_t *svars );
static int check_cancel( sync_vars_t *svars );
@@ -139,124 +61,6 @@ static int check_cancel( sync_vars_t *svars );
#define ST_SENDING_NEW (1<<15)
-static void
-create_state( sync_vars_t *svars )
-{
- if (!(svars->nfp = fopen( svars->nname, "w" ))) {
- sys_error( "Error: cannot create new sync state %s", svars->nname );
- exit( 1 );
- }
-}
-
-static void ATTR_PRINTFLIKE(2, 3)
-jFprintf( sync_vars_t *svars, const char *msg, ... )
-{
- va_list va;
-
- if (JLimit && !--JLimit)
- exit( 101 );
- if (!svars->jfp) {
- create_state( svars );
- if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : "w" ))) {
- sys_error( "Error: cannot create journal %s", svars->jname );
- exit( 1 );
- }
- setlinebuf( svars->jfp );
- if (!svars->replayed)
- Fprintf( svars->jfp, JOURNAL_VERSION "\n" );
- }
- va_start( va, msg );
- vFprintf( svars->jfp, msg, va );
- va_end( va );
- if (JLimit && !--JLimit)
- exit( 100 );
-}
-
-#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \
- do { \
- debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \
- jFprintf( svars, log_fmt "\n", deparen(log_args) ); \
- } while (0)
-#define JLOG3(log_fmt, log_args, dbg_fmt) \
- JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args))
-#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \
- JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args))
-#define JLOG_SEL(_1, _2, _3, _4, x, ...) x
-#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, NO_JLOG1)(__VA_ARGS__)
-
-static void
-assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
-{
- srec->uid[t] = uid;
- if (uid == svars->maxuid[t] + 1)
- svars->maxuid[t] = uid;
- srec->status &= ~S_PENDING;
- srec->wstate &= ~W_UPGRADE;
- srec->tuid[0] = 0;
-}
-
-#define ASSIGN_UID(srec, t, nuid, ...) \
- do { \
- JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], nuid), __VA_ARGS__ ); \
- assign_uid( svars, srec, t, nuid ); \
- } while (0)
-
-static void
-assign_tuid( sync_vars_t *svars, sync_rec_t *srec )
-{
- for (uint i = 0; i < TUIDL; i++) {
- uchar c = arc4_getbyte() & 0x3f;
- srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 :
- c < 62 ? c + '0' - 52 : c == 62 ? '+' : '/');
- }
- JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], srec->tuid), "new TUID" );
-}
-
-static int
-match_tuids( sync_vars_t *svars, int t, message_t *msgs )
-{
- sync_rec_t *srec;
- message_t *tmsg, *ntmsg = NULL;
- const char *diag;
- int num_lost = 0;
-
- for (srec = svars->srecs; srec; srec = srec->next) {
- if (srec->status & S_DEAD)
- continue;
- if (!srec->uid[t] && srec->tuid[0]) {
- debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", srec->uid[F], srec->uid[N], srec->tuid );
- for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) {
- if (tmsg->status & M_DEAD)
- continue;
- if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) {
- diag = (tmsg == ntmsg) ? "adjacently" : "after gap";
- goto mfound;
- }
- }
- for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) {
- if (tmsg->status & M_DEAD)
- continue;
- if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) {
- diag = "after reset";
- goto mfound;
- }
- }
- JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID lost" );
- // Note: status remains S_PENDING.
- srec->tuid[0] = 0;
- num_lost++;
- continue;
- mfound:
- tmsg->srec = srec;
- srec->msg[t] = tmsg;
- ntmsg = tmsg->next;
- ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag );
- }
- }
- return num_lost;
-}
-
-
static uchar
sanitize_flags( uchar tflags, sync_vars_t *svars, int t )
{
@@ -627,452 +431,6 @@ check_ret( int sts, void *aux )
} \
INIT_SVARS(vars->aux)
-static char *
-clean_strdup( const char *s )
-{
- char *cs;
- uint i;
-
- cs = nfstrdup( s );
- for (i = 0; cs[i]; i++)
- if (cs[i] == '/')
- cs[i] = '!';
- return cs;
-}
-
-
-static sync_rec_t *
-upgrade_srec( sync_vars_t *svars, sync_rec_t *srec )
-{
- // Create an entry and append it to the current one.
- sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) );
- nsrec->next = srec->next;
- srec->next = nsrec;
- if (svars->srecadd == &srec->next)
- svars->srecadd = &nsrec->next;
- // Move the placeholder to the new entry.
- int t = (srec->status & S_DUMMY(F)) ? F : N;
- nsrec->uid[t] = srec->uid[t];
- srec->uid[t] = 0;
- if (srec->msg[t]) { // NULL during journal replay; is assigned later.
- nsrec->msg[t] = srec->msg[t];
- nsrec->msg[t]->srec = nsrec;
- srec->msg[t] = NULL;
- }
- // Mark the original entry for upgrade.
- srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING;
- srec->wstate |= W_UPGRADE;
- // Mark the placeholder for nuking.
- nsrec->wstate = W_PURGE;
- nsrec->aflags[t] = F_DELETED;
- return nsrec;
-}
-
-static int
-prepare_state( sync_vars_t *svars )
-{
- char *s, *cmname, *csname;
- channel_conf_t *chan;
-
- chan = svars->chan;
- if (!strcmp( chan->sync_state ? chan->sync_state : global_conf.sync_state, "*" )) {
- const char *path = svars->drv[N]->get_box_path( svars->ctx[N] );
- if (!path) {
- error( "Error: store '%s' does not support in-box sync state\n", chan->stores[N]->name );
- return 0;
- }
- nfasprintf( &svars->dname, "%s/." EXE "state", path );
- } else {
- csname = clean_strdup( svars->box_name[N] );
- if (chan->sync_state) {
- nfasprintf( &svars->dname, "%s%s", chan->sync_state, csname );
- } else {
- char c = FieldDelimiter;
- cmname = clean_strdup( svars->box_name[F] );
- nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", global_conf.sync_state,
- c, chan->stores[F]->name, c, cmname, c, chan->stores[N]->name, c, csname );
- free( cmname );
- }
- free( csname );
- if (!(s = strrchr( svars->dname, '/' ))) {
- error( "Error: invalid SyncState location '%s'\n", svars->dname );
- return 0;
- }
- *s = 0;
- if (mkdir( svars->dname, 0700 ) && errno != EEXIST) {
- sys_error( "Error: cannot create SyncState directory '%s'", svars->dname );
- return 0;
- }
- *s = '/';
- }
- nfasprintf( &svars->jname, "%s.journal", svars->dname );
- nfasprintf( &svars->nname, "%s.new", svars->dname );
- nfasprintf( &svars->lname, "%s.lock", svars->dname );
- return 1;
-}
-
-static int
-lock_state( sync_vars_t *svars )
-{
- struct flock lck;
-
- if (svars->lfd >= 0)
- return 1;
- memset( &lck, 0, sizeof(lck) );
-#if SEEK_SET != 0
- lck.l_whence = SEEK_SET;
-#endif
-#if F_WRLCK != 0
- lck.l_type = F_WRLCK;
-#endif
- if ((svars->lfd = open( svars->lname, O_WRONLY|O_CREAT, 0666 )) < 0) {
- sys_error( "Error: cannot create lock file %s", svars->lname );
- return 0;
- }
- if (fcntl( svars->lfd, F_SETLK, &lck )) {
- error( "Error: channel :%s:%s-:%s:%s is locked\n",
- svars->chan->stores[F]->name, svars->orig_name[F], svars->chan->stores[N]->name, svars->orig_name[N] );
- close( svars->lfd );
- svars->lfd = -1;
- return 0;
- }
- return 1;
-}
-
-static void
-save_state( sync_vars_t *svars )
-{
- sync_rec_t *srec;
- char fbuf[16]; /* enlarge when support for keywords is added */
-
- // If no change was made, the state is also unmodified.
- if (!svars->jfp && !svars->replayed)
- return;
-
- if (!svars->nfp)
- create_state( svars );
- Fprintf( svars->nfp,
- "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n",
- svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] );
- if (svars->maxxfuid)
- Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid );
- Fprintf( svars->nfp, "\n" );
- for (srec = svars->srecs; srec; srec = srec->next) {
- if (srec->status & S_DEAD)
- continue;
- make_flags( srec->flags, fbuf );
- Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], srec->uid[N],
- (srec->status & S_DUMMY(F)) ? "<" : (srec->status & S_DUMMY(N)) ? ">" : "",
- (srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf );
- }
-
- Fclose( svars->nfp, 1 );
- if (svars->jfp)
- Fclose( svars->jfp, 0 );
- if (!(DFlags & KEEPJOURNAL)) {
- /* order is important! */
- if (rename( svars->nname, svars->dname ))
- warn( "Warning: cannot commit sync state %s\n", svars->dname );
- else if (unlink( svars->jname ))
- warn( "Warning: cannot delete journal %s\n", svars->jname );
- }
-}
-
-static int
-load_state( sync_vars_t *svars )
-{
- sync_rec_t *srec, *nsrec;
- char *s;
- FILE *jfp;
- uint ll;
- uint maxxnuid = 0;
- char c;
- struct stat st;
- char fbuf[16]; /* enlarge when support for keywords is added */
- char buf[128], buf1[64], buf2[64];
-
- if ((jfp = fopen( svars->dname, "r" ))) {
- if (!lock_state( svars ))
- goto jbail;
- debug( "reading sync state %s ...\n", svars->dname );
- int line = 0;
- while (fgets( buf, sizeof(buf), jfp )) {
- line++;
- if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') {
- error( "Error: incomplete sync state header entry at %s:%d\n", svars->dname, line );
- jbail:
- fclose( jfp );
- return 0;
- }
- if (ll == 1)
- goto gothdr;
- if (line == 1 && isdigit( buf[0] )) { // Pre-1.1 legacy
- if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 ||
- sscanf( buf1, "%u:%u", &svars->uidval[F], &svars->maxuid[F] ) < 2 ||
- sscanf( buf2, "%u:%u:%u", &svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) {
- error( "Error: invalid sync state header in %s\n", svars->dname );
- goto jbail;
- }
- goto gothdr;
- }
- uint uid;
- if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) {
- error( "Error: malformed sync state header entry at %s:%d\n", svars->dname, line );
- goto jbail;
- }
- if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, "MasterUidValidity" ) /* Pre-1.4 legacy */) {
- svars->uidval[F] = uid;
- } else if (!strcmp( buf1, "NearUidValidity" ) || !strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) {
- svars->uidval[N] = uid;
- } else if (!strcmp( buf1, "MaxPulledUid" )) {
- svars->maxuid[F] = uid;
- } else if (!strcmp( buf1, "MaxPushedUid" )) {
- svars->maxuid[N] = uid;
- } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) {
- svars->maxxfuid = uid;
- } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy
- maxxnuid = uid;
- } else {
- error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line );
- goto jbail;
- }
- }
- error( "Error: unterminated sync state header in %s\n", svars->dname );
- goto jbail;
- gothdr:
- while (fgets( buf, sizeof(buf), jfp )) {
- line++;
- if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
- error( "Error: incomplete sync state entry at %s:%d\n", svars->dname, line );
- goto jbail;
- }
- buf[ll] = 0;
- fbuf[0] = 0;
- uint t1, t2;
- if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) {
- error( "Error: invalid sync state entry at %s:%d\n", svars->dname, line );
- goto jbail;
- }
- srec = nfzalloc( sizeof(*srec) );
- srec->uid[F] = t1;
- srec->uid[N] = t2;
- s = fbuf;
- if (*s == '<') {
- s++;
- srec->status = S_DUMMY(F);
- } else if (*s == '>') {
- s++;
- srec->status = S_DUMMY(N);
- }
- if (*s == '^') { // Pre-1.4 legacy
- s++;
- srec->status = S_SKIPPED;
- } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) {
- s++;
- srec->status = S_EXPIRE | S_EXPIRED;
- } else if (srec->uid[F] == (uint)-1) { // Pre-1.3 legacy
- srec->uid[F] = 0;
- srec->status = S_SKIPPED;
- } else if (srec->uid[N] == (uint)-1) {
- srec->uid[N] = 0;
- srec->status = S_SKIPPED;
- }
- srec->flags = parse_flags( s );
- debug( " entry (%u,%u,%u,%s%s)\n", srec->uid[F], srec->uid[N], srec->flags,
- (srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "",
- (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : (srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" );
- *svars->srecadd = srec;
- svars->srecadd = &srec->next;
- svars->nsrecs++;
- }
- fclose( jfp );
- svars->existing = 1;
- } else {
- if (errno != ENOENT) {
- sys_error( "Error: cannot read sync state %s", svars->dname );
- return 0;
- }
- svars->existing = 0;
- }
-
- // This is legacy support for pre-1.3 sync states.
- if (maxxnuid) {
- uint minwuid = UINT_MAX;
- for (srec = svars->srecs; srec; srec = srec->next) {
- if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) || !srec->uid[F])
- continue;
- if (srec->status & S_EXPIRED) {
- if (!srec->uid[N]) {
- // The expired message was already gone.
- continue;
- }
- // The expired message was not expunged yet, so re-examine it.
- // This will happen en masse, so just extend the bulk fetch.
- } else {
- if (srec->uid[N] && maxxnuid >= srec->uid[N]) {
- // The non-expired message is in the generally expired range,
- // so don't make it contribute to the bulk fetch.
- continue;
- }
- // Usual non-expired message.
- }
- if (minwuid > srec->uid[F])
- minwuid = srec->uid[F];
- }
- svars->maxxfuid = minwuid - 1;
- }
-
- int line = 0;
- if ((jfp = fopen( svars->jname, "r" ))) {
- if (!lock_state( svars ))
- goto jbail;
- if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) {
- debug( "recovering journal ...\n" );
- if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
- error( "Error: incomplete journal header in %s\n", svars->jname );
- goto jbail;
- }
- buf[ll] = 0;
- if (!equals( buf, (int)ll, JOURNAL_VERSION, strlen(JOURNAL_VERSION) )) {
- error( "Error: incompatible journal version "
- "(got %s, expected " JOURNAL_VERSION ")\n", buf );
- goto jbail;
- }
- srec = NULL;
- line = 1;
- while (fgets( buf, sizeof(buf), jfp )) {
- line++;
- if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
- error( "Error: incomplete journal entry at %s:%d\n", svars->jname, line );
- goto jbail;
- }
- buf[ll] = 0;
- int tn;
- uint t1, t2, t3, t4;
- if ((c = buf[0]) == '#' ?
- (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
- c == '!' ?
- (sscanf( buf + 2, "%u", &t1 ) != 1) :
- c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ?
- (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) :
- c != '^' ?
- (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3) :
- (sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4))
- {
- error( "Error: malformed journal entry at %s:%d\n", svars->jname, line );
- goto jbail;
- }
- if (c == 'N') {
- svars->maxuid[t1] = t2;
- } else if (c == 'F') {
- svars->finduid[t1] = t2;
- } else if (c == 'T') {
- *uint_array_append( &svars->trashed_msgs[t1] ) = t2;
- } else if (c == '!') {
- svars->maxxfuid = t1;
- } else if (c == '|') {
- svars->uidval[F] = t1;
- svars->uidval[N] = t2;
- } else if (c == '+') {
- srec = nfzalloc( sizeof(*srec) );
- srec->uid[F] = t1;
- srec->uid[N] = t2;
- debug( " new entry(%u,%u)\n", t1, t2 );
- srec->status = S_PENDING;
- *svars->srecadd = srec;
- svars->srecadd = &srec->next;
- svars->nsrecs++;
- } else {
- for (nsrec = srec; srec; srec = srec->next)
- if (srec->uid[F] == t1 && srec->uid[N] == t2)
- goto syncfnd;
- for (srec = svars->srecs; srec != nsrec; srec = srec->next)
- if (srec->uid[F] == t1 && srec->uid[N] == t2)
- goto syncfnd;
- error( "Error: journal entry at %s:%d refers to non-existing sync state entry\n", svars->jname, line );
- goto jbail;
- syncfnd:
- debugn( " entry(%u,%u,%u) ", srec->uid[F], srec->uid[N], srec->flags );
- switch (c) {
- case '-':
- debug( "killed\n" );
- srec->status = S_DEAD;
- break;
- case '=':
- debug( "aborted\n" );
- if (svars->maxxfuid < srec->uid[F])
- svars->maxxfuid = srec->uid[F];
- srec->status = S_DEAD;
- break;
- case '#':
- memcpy( srec->tuid, buf + tn + 2, TUIDL );
- debug( "TUID now %." stringify(TUIDL) "s\n", srec->tuid );
- break;
- case '&':
- debug( "TUID %." stringify(TUIDL) "s lost\n", srec->tuid );
- srec->tuid[0] = 0;
- break;
- case '<':
- debug( "far side now %u\n", t3 );
- assign_uid( svars, srec, F, t3 );
- break;
- case '>':
- debug( "near side now %u\n", t3 );
- assign_uid( svars, srec, N, t3 );
- break;
- case '*':
- debug( "flags now %u\n", t3 );
- srec->flags = (uchar)t3;
- srec->aflags[F] = srec->aflags[N] = 0; // Clear F_DELETED from purge
- srec->wstate &= ~W_PURGE;
- break;
- case '~':
- debug( "status now %#x\n", t3 );
- srec->status = (uchar)t3;
- break;
- case '_':
- debug( "has placeholder now\n" );
- srec->status = S_PENDING; // Pre-1.4 legacy only
- srec->status |= !srec->uid[F] ? S_DUMMY(F) : S_DUMMY(N);
- break;
- case '^':
- debug( "is being upgraded, flags %u, srec flags %u\n", t3, t4 );
- srec->pflags = (uchar)t3;
- srec->flags = (uchar)t4;
- srec = upgrade_srec( svars, srec );
- break;
- default:
- error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line );
- goto jbail;
- }
- }
- }
- }
- fclose( jfp );
- sort_uint_array( svars->trashed_msgs[F].array );
- sort_uint_array( svars->trashed_msgs[N].array );
- } else {
- if (errno != ENOENT) {
- sys_error( "Error: cannot read journal %s", svars->jname );
- return 0;
- }
- }
- svars->replayed = line;
-
- return 1;
-}
-
-static void
-delete_state( sync_vars_t *svars )
-{
- unlink( svars->nname );
- unlink( svars->jname );
- if (unlink( svars->dname ) || unlink( svars->lname )) {
- sys_error( "Error: channel %s: sync state cannot be deleted", svars->chan->name );
- svars->ret = SYNC_FAIL;
- }
-}
-
static void box_confirmed( int sts, uint uidvalidity, void *aux );
static void box_confirmed2( sync_vars_t *svars, int t );
static void box_deleted( int sts, void *aux );
diff --git a/src/sync_p.h b/src/sync_p.h
@@ -0,0 +1,93 @@
+// SPDX-FileCopyrightText: 2002-2022 Oswald Buddenhagen <ossi@users.sf.net>
+// SPDX-License-Identifier: GPL-2.0-or-later WITH LicenseRef-isync-GPL-exception
+//
+// mbsync - mailbox synchronizer
+//
+
+#define DEBUG_FLAG DEBUG_SYNC
+
+#include "sync.h"
+
+// This is the (mostly) persistent status of the sync record.
+// Most of these bits are actually mutually exclusive. It is a
+// bitfield to allow for easy testing for multiple states.
+#define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled)
+#define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed)
+#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry)
+#define S_DUMMY(fn) (1<<(3+(fn))) // f/n message is only a placeholder
+#define S_SKIPPED (1<<5) // pre-1.4 legacy: the entry was not propagated (message is too big)
+#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored
+
+// Ephemeral working set.
+#define W_NEXPIRE (1<<0) // temporary: new expiration state
+#define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion
+#define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge
+#define W_UPGRADE (1<<4) // ephemeral: upgrading placeholder, do not apply MaxSize
+#define W_PURGE (1<<5) // ephemeral: placeholder is being nuked
+
+typedef struct sync_rec {
+ struct sync_rec *next;
+ /* string_list_t *keywords; */
+ uint uid[2];
+ message_t *msg[2];
+ uchar status, wstate, flags, pflags, aflags[2], dflags[2];
+ char tuid[TUIDL];
+} sync_rec_t;
+
+typedef struct {
+ int t[2];
+ void (*cb)( int sts, void *aux ), *aux;
+ char *dname, *jname, *nname, *lname, *box_name[2];
+ FILE *jfp, *nfp;
+ sync_rec_t *srecs, **srecadd;
+ channel_conf_t *chan;
+ store_t *ctx[2];
+ driver_t *drv[2];
+ const char *orig_name[2];
+ message_t *msgs[2], *new_msgs[2];
+ uint_array_alloc_t trashed_msgs[2];
+ int state[2], lfd, ret, existing, replayed;
+ uint ref_count, nsrecs, opts[2];
+ uint new_pending[2], flags_pending[2], trash_pending[2];
+ uint maxuid[2]; // highest UID that was already propagated
+ uint oldmaxuid[2]; // highest UID that was already propagated before this run
+ uint uidval[2]; // UID validity value
+ uint newuidval[2]; // UID validity obtained from driver
+ uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
+ uint maxxfuid; // highest expired UID on far side
+ uint oldmaxxfuid; // highest expired UID on far side before this run
+ uchar good_flags[2], bad_flags[2];
+} sync_vars_t;
+
+int prepare_state( sync_vars_t *svars );
+int lock_state( sync_vars_t *svars );
+int load_state( sync_vars_t *svars );
+void save_state( sync_vars_t *svars );
+void delete_state( sync_vars_t *svars );
+
+void ATTR_PRINTFLIKE(2, 3) jFprintf( sync_vars_t *svars, const char *msg, ... );
+
+#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \
+ do { \
+ debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \
+ jFprintf( svars, log_fmt "\n", deparen(log_args) ); \
+ } while (0)
+#define JLOG3(log_fmt, log_args, dbg_fmt) \
+ JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args))
+#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \
+ JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args))
+#define JLOG_SEL(_1, _2, _3, _4, x, ...) x
+#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, NO_JLOG1)(__VA_ARGS__)
+
+void assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid );
+
+#define ASSIGN_UID(srec, t, nuid, ...) \
+ do { \
+ JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], nuid), __VA_ARGS__ ); \
+ assign_uid( svars, srec, t, nuid ); \
+ } while (0)
+
+void assign_tuid( sync_vars_t *svars, sync_rec_t *srec );
+int match_tuids( sync_vars_t *svars, int t, message_t *msgs );
+
+sync_rec_t *upgrade_srec( sync_vars_t *svars, sync_rec_t *srec );
diff --git a/src/sync_state.c b/src/sync_state.c
@@ -0,0 +1,569 @@
+// SPDX-FileCopyrightText: 2004-2022 Oswald Buddenhagen <ossi@users.sf.net>
+// SPDX-License-Identifier: GPL-2.0-or-later WITH LicenseRef-isync-GPL-exception
+//
+// mbsync - mailbox synchronizer
+//
+
+#define DEBUG_FLAG DEBUG_SYNC
+
+#include "sync_p.h"
+
+#include <fcntl.h>
+#include <ctype.h>
+#include <errno.h>
+#include <sys/stat.h>
+
+#define JOURNAL_VERSION "4"
+
+const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", "pull" };
+
+static char *
+clean_strdup( const char *s )
+{
+ char *cs = nfstrdup( s );
+ for (uint i = 0; cs[i]; i++)
+ if (cs[i] == '/')
+ cs[i] = '!';
+ return cs;
+}
+
+int
+prepare_state( sync_vars_t *svars )
+{
+ channel_conf_t *chan = svars->chan;
+ if (!strcmp( chan->sync_state ? chan->sync_state : global_conf.sync_state, "*" )) {
+ const char *path = svars->drv[N]->get_box_path( svars->ctx[N] );
+ if (!path) {
+ error( "Error: store '%s' does not support in-box sync state\n", chan->stores[N]->name );
+ return 0;
+ }
+ nfasprintf( &svars->dname, "%s/." EXE "state", path );
+ } else {
+ char *cnname = clean_strdup( svars->box_name[N] );
+ if (chan->sync_state) {
+ nfasprintf( &svars->dname, "%s%s", chan->sync_state, cnname );
+ } else {
+ char c = FieldDelimiter;
+ char *cfname = clean_strdup( svars->box_name[F] );
+ nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", global_conf.sync_state,
+ c, chan->stores[F]->name, c, cfname, c, chan->stores[N]->name, c, cnname );
+ free( cfname );
+ }
+ free( cnname );
+ char *s;
+ if (!(s = strrchr( svars->dname, '/' ))) {
+ error( "Error: invalid SyncState location '%s'\n", svars->dname );
+ return 0;
+ }
+ // Note that this may be shorter than the configuration value,
+ // as that may contain a filename prefix.
+ *s = 0;
+ if (mkdir( svars->dname, 0700 ) && errno != EEXIST) {
+ sys_error( "Error: cannot create SyncState directory '%s'", svars->dname );
+ return 0;
+ }
+ *s = '/';
+ }
+ nfasprintf( &svars->jname, "%s.journal", svars->dname );
+ nfasprintf( &svars->nname, "%s.new", svars->dname );
+ nfasprintf( &svars->lname, "%s.lock", svars->dname );
+ return 1;
+}
+
+int
+lock_state( sync_vars_t *svars )
+{
+ struct flock lck;
+
+ if (svars->lfd >= 0)
+ return 1;
+ memset( &lck, 0, sizeof(lck) );
+#if SEEK_SET != 0
+ lck.l_whence = SEEK_SET;
+#endif
+#if F_WRLCK != 0
+ lck.l_type = F_WRLCK;
+#endif
+ if ((svars->lfd = open( svars->lname, O_WRONLY | O_CREAT, 0666 )) < 0) {
+ sys_error( "Error: cannot create lock file %s", svars->lname );
+ return 0;
+ }
+ if (fcntl( svars->lfd, F_SETLK, &lck )) {
+ error( "Error: channel :%s:%s-:%s:%s is locked\n",
+ svars->chan->stores[F]->name, svars->orig_name[F], svars->chan->stores[N]->name, svars->orig_name[N] );
+ close( svars->lfd );
+ svars->lfd = -1;
+ return 0;
+ }
+ return 1;
+}
+
+static uchar
+parse_flags( const char *buf )
+{
+ uchar flags = 0;
+ for (uint i = 0, d = 0; i < as(MsgFlags); i++) {
+ if (buf[d] == MsgFlags[i]) {
+ flags |= (1 << i);
+ d++;
+ }
+ }
+ return flags;
+}
+
+int
+load_state( sync_vars_t *svars )
+{
+ sync_rec_t *srec, *nsrec;
+ FILE *jfp;
+ uint ll;
+ uint maxxnuid = 0;
+ char fbuf[16]; // enlarge when support for keywords is added
+ char buf[128], buf1[64], buf2[64];
+
+ if ((jfp = fopen( svars->dname, "r" ))) {
+ if (!lock_state( svars ))
+ goto jbail;
+ debug( "reading sync state %s ...\n", svars->dname );
+ int line = 0;
+ while (fgets( buf, sizeof(buf), jfp )) {
+ line++;
+ if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') {
+ error( "Error: incomplete sync state header entry at %s:%d\n", svars->dname, line );
+ jbail:
+ fclose( jfp );
+ return 0;
+ }
+ if (ll == 1)
+ goto gothdr;
+ if (line == 1 && isdigit( buf[0] )) { // Pre-1.1 legacy
+ if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 ||
+ sscanf( buf1, "%u:%u", &svars->uidval[F], &svars->maxuid[F] ) < 2 ||
+ sscanf( buf2, "%u:%u:%u", &svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) {
+ error( "Error: invalid sync state header in %s\n", svars->dname );
+ goto jbail;
+ }
+ goto gothdr;
+ }
+ uint uid;
+ if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) {
+ error( "Error: malformed sync state header entry at %s:%d\n", svars->dname, line );
+ goto jbail;
+ }
+ if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, "MasterUidValidity" ) /* Pre-1.4 legacy */) {
+ svars->uidval[F] = uid;
+ } else if (!strcmp( buf1, "NearUidValidity" ) || !strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) {
+ svars->uidval[N] = uid;
+ } else if (!strcmp( buf1, "MaxPulledUid" )) {
+ svars->maxuid[F] = uid;
+ } else if (!strcmp( buf1, "MaxPushedUid" )) {
+ svars->maxuid[N] = uid;
+ } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) {
+ svars->maxxfuid = uid;
+ } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy
+ maxxnuid = uid;
+ } else {
+ error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line );
+ goto jbail;
+ }
+ }
+ error( "Error: unterminated sync state header in %s\n", svars->dname );
+ goto jbail;
+ gothdr:
+ while (fgets( buf, sizeof(buf), jfp )) {
+ line++;
+ if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
+ error( "Error: incomplete sync state entry at %s:%d\n", svars->dname, line );
+ goto jbail;
+ }
+ buf[ll] = 0;
+ fbuf[0] = 0;
+ uint t1, t2;
+ if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) {
+ error( "Error: invalid sync state entry at %s:%d\n", svars->dname, line );
+ goto jbail;
+ }
+ srec = nfzalloc( sizeof(*srec) );
+ srec->uid[F] = t1;
+ srec->uid[N] = t2;
+ char *s = fbuf;
+ if (*s == '<') {
+ s++;
+ srec->status = S_DUMMY(F);
+ } else if (*s == '>') {
+ s++;
+ srec->status = S_DUMMY(N);
+ }
+ if (*s == '^') { // Pre-1.4 legacy
+ s++;
+ srec->status = S_SKIPPED;
+ } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) {
+ s++;
+ srec->status = S_EXPIRE | S_EXPIRED;
+ } else if (srec->uid[F] == (uint)-1) { // Pre-1.3 legacy
+ srec->uid[F] = 0;
+ srec->status = S_SKIPPED;
+ } else if (srec->uid[N] == (uint)-1) {
+ srec->uid[N] = 0;
+ srec->status = S_SKIPPED;
+ }
+ srec->flags = parse_flags( s );
+ debug( " entry (%u,%u,%u,%s%s)\n", srec->uid[F], srec->uid[N], srec->flags,
+ (srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "",
+ (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : (srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" );
+ *svars->srecadd = srec;
+ svars->srecadd = &srec->next;
+ svars->nsrecs++;
+ }
+ fclose( jfp );
+ svars->existing = 1;
+ } else {
+ if (errno != ENOENT) {
+ sys_error( "Error: cannot read sync state %s", svars->dname );
+ return 0;
+ }
+ svars->existing = 0;
+ }
+
+ // This is legacy support for pre-1.3 sync states.
+ if (maxxnuid) {
+ uint minwuid = UINT_MAX;
+ for (srec = svars->srecs; srec; srec = srec->next) {
+ if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) || !srec->uid[F])
+ continue;
+ if (srec->status & S_EXPIRED) {
+ if (!srec->uid[N]) {
+ // The expired message was already gone.
+ continue;
+ }
+ // The expired message was not expunged yet, so re-examine it.
+ // This will happen en masse, so just extend the bulk fetch.
+ } else {
+ if (srec->uid[N] && maxxnuid >= srec->uid[N]) {
+ // The non-expired message is in the generally expired range,
+ // so don't make it contribute to the bulk fetch.
+ continue;
+ }
+ // Usual non-expired message.
+ }
+ if (minwuid > srec->uid[F])
+ minwuid = srec->uid[F];
+ }
+ svars->maxxfuid = minwuid - 1;
+ }
+
+ int line = 0;
+ if ((jfp = fopen( svars->jname, "r" ))) {
+ if (!lock_state( svars ))
+ goto jbail;
+ struct stat st;
+ if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) {
+ debug( "recovering journal ...\n" );
+ if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
+ error( "Error: incomplete journal header in %s\n", svars->jname );
+ goto jbail;
+ }
+ buf[ll] = 0;
+ if (!equals( buf, (int)ll, JOURNAL_VERSION, strlen(JOURNAL_VERSION) )) {
+ error( "Error: incompatible journal version"
+ " (got %s, expected " JOURNAL_VERSION ")\n", buf );
+ goto jbail;
+ }
+ srec = NULL;
+ line = 1;
+ while (fgets( buf, sizeof(buf), jfp )) {
+ line++;
+ if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
+ error( "Error: incomplete journal entry at %s:%d\n", svars->jname, line );
+ goto jbail;
+ }
+ buf[ll] = 0;
+ char c;
+ int tn;
+ uint t1, t2, t3, t4;
+ if ((c = buf[0]) == '#' ?
+ (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
+ c == '!' ?
+ (sscanf( buf + 2, "%u", &t1 ) != 1) :
+ c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ?
+ (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) :
+ c != '^' ?
+ (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3) :
+ (sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4))
+ {
+ error( "Error: malformed journal entry at %s:%d\n", svars->jname, line );
+ goto jbail;
+ }
+ if (c == 'N') {
+ svars->maxuid[t1] = t2;
+ } else if (c == 'F') {
+ svars->finduid[t1] = t2;
+ } else if (c == 'T') {
+ *uint_array_append( &svars->trashed_msgs[t1] ) = t2;
+ } else if (c == '!') {
+ svars->maxxfuid = t1;
+ } else if (c == '|') {
+ svars->uidval[F] = t1;
+ svars->uidval[N] = t2;
+ } else if (c == '+') {
+ srec = nfzalloc( sizeof(*srec) );
+ srec->uid[F] = t1;
+ srec->uid[N] = t2;
+ debug( " new entry(%u,%u)\n", t1, t2 );
+ srec->status = S_PENDING;
+ *svars->srecadd = srec;
+ svars->srecadd = &srec->next;
+ svars->nsrecs++;
+ } else {
+ for (nsrec = srec; srec; srec = srec->next)
+ if (srec->uid[F] == t1 && srec->uid[N] == t2)
+ goto syncfnd;
+ for (srec = svars->srecs; srec != nsrec; srec = srec->next)
+ if (srec->uid[F] == t1 && srec->uid[N] == t2)
+ goto syncfnd;
+ error( "Error: journal entry at %s:%d refers to non-existing sync state entry\n", svars->jname, line );
+ goto jbail;
+ syncfnd:
+ debugn( " entry(%u,%u,%u) ", srec->uid[F], srec->uid[N], srec->flags );
+ switch (c) {
+ case '-':
+ debug( "killed\n" );
+ srec->status = S_DEAD;
+ break;
+ case '=':
+ debug( "aborted\n" );
+ if (svars->maxxfuid < srec->uid[F])
+ svars->maxxfuid = srec->uid[F];
+ srec->status = S_DEAD;
+ break;
+ case '#':
+ memcpy( srec->tuid, buf + tn + 2, TUIDL );
+ debug( "TUID now %." stringify(TUIDL) "s\n", srec->tuid );
+ break;
+ case '&':
+ debug( "TUID %." stringify(TUIDL) "s lost\n", srec->tuid );
+ srec->tuid[0] = 0;
+ break;
+ case '<':
+ debug( "far side now %u\n", t3 );
+ assign_uid( svars, srec, F, t3 );
+ break;
+ case '>':
+ debug( "near side now %u\n", t3 );
+ assign_uid( svars, srec, N, t3 );
+ break;
+ case '*':
+ debug( "flags now %u\n", t3 );
+ srec->flags = (uchar)t3;
+ srec->aflags[F] = srec->aflags[N] = 0; // Clear F_DELETED from purge
+ srec->wstate &= ~W_PURGE;
+ break;
+ case '~':
+ debug( "status now %#x\n", t3 );
+ srec->status = (uchar)t3;
+ break;
+ case '_':
+ debug( "has placeholder now\n" );
+ srec->status = S_PENDING; // Pre-1.4 legacy only
+ srec->status |= !srec->uid[F] ? S_DUMMY(F) : S_DUMMY(N);
+ break;
+ case '^':
+ debug( "is being upgraded, flags %u, srec flags %u\n", t3, t4 );
+ srec->pflags = (uchar)t3;
+ srec->flags = (uchar)t4;
+ srec = upgrade_srec( svars, srec );
+ break;
+ default:
+ error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line );
+ goto jbail;
+ }
+ }
+ }
+ }
+ fclose( jfp );
+ sort_uint_array( svars->trashed_msgs[F].array );
+ sort_uint_array( svars->trashed_msgs[N].array );
+ } else {
+ if (errno != ENOENT) {
+ sys_error( "Error: cannot read journal %s", svars->jname );
+ return 0;
+ }
+ }
+ svars->replayed = line;
+
+ return 1;
+}
+
+static void
+create_state( sync_vars_t *svars )
+{
+ if (!(svars->nfp = fopen( svars->nname, "w" ))) {
+ sys_error( "Error: cannot create new sync state %s", svars->nname );
+ exit( 1 );
+ }
+}
+
+void
+jFprintf( sync_vars_t *svars, const char *msg, ... )
+{
+ va_list va;
+
+ if (JLimit && !--JLimit)
+ exit( 101 );
+ if (!svars->jfp) {
+ create_state( svars );
+ if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : "w" ))) {
+ sys_error( "Error: cannot create journal %s", svars->jname );
+ exit( 1 );
+ }
+ setlinebuf( svars->jfp );
+ if (!svars->replayed)
+ Fprintf( svars->jfp, JOURNAL_VERSION "\n" );
+ }
+ va_start( va, msg );
+ vFprintf( svars->jfp, msg, va );
+ va_end( va );
+ if (JLimit && !--JLimit)
+ exit( 100 );
+}
+
+void
+save_state( sync_vars_t *svars )
+{
+ // If no change was made, the state is also unmodified.
+ if (!svars->jfp && !svars->replayed)
+ return;
+
+ if (!svars->nfp)
+ create_state( svars );
+ Fprintf( svars->nfp,
+ "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n",
+ svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] );
+ if (svars->maxxfuid)
+ Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid );
+ Fprintf( svars->nfp, "\n" );
+ for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) {
+ if (srec->status & S_DEAD)
+ continue;
+ char fbuf[16]; // enlarge when support for keywords is added
+ make_flags( srec->flags, fbuf );
+ Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], srec->uid[N],
+ (srec->status & S_DUMMY(F)) ? "<" : (srec->status & S_DUMMY(N)) ? ">" : "",
+ (srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf );
+ }
+
+ Fclose( svars->nfp, 1 );
+ if (svars->jfp)
+ Fclose( svars->jfp, 0 );
+ if (!(DFlags & KEEPJOURNAL)) {
+ // Order is important!
+ if (rename( svars->nname, svars->dname ))
+ warn( "Warning: cannot commit sync state %s\n", svars->dname );
+ else if (unlink( svars->jname ))
+ warn( "Warning: cannot delete journal %s\n", svars->jname );
+ }
+}
+
+void
+delete_state( sync_vars_t *svars )
+{
+ unlink( svars->nname );
+ unlink( svars->jname );
+ if (unlink( svars->dname ) || unlink( svars->lname )) {
+ sys_error( "Error: channel %s: sync state cannot be deleted", svars->chan->name );
+ svars->ret = SYNC_FAIL;
+ }
+}
+
+
+void
+assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
+{
+ srec->uid[t] = uid;
+ if (uid == svars->maxuid[t] + 1)
+ svars->maxuid[t] = uid;
+ srec->status &= ~S_PENDING;
+ srec->wstate &= ~W_UPGRADE;
+ srec->tuid[0] = 0;
+}
+
+void
+assign_tuid( sync_vars_t *svars, sync_rec_t *srec )
+{
+ for (uint i = 0; i < TUIDL; i++) {
+ uchar c = arc4_getbyte() & 0x3f;
+ srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 :
+ c < 62 ? c + '0' - 52 : c == 62 ? '+' : '/');
+ }
+ JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], srec->tuid), "new TUID" );
+}
+
+int
+match_tuids( sync_vars_t *svars, int t, message_t *msgs )
+{
+ message_t *tmsg, *ntmsg = NULL;
+ const char *diag;
+ int num_lost = 0;
+
+ for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) {
+ if (srec->status & S_DEAD)
+ continue;
+ if (!srec->uid[t] && srec->tuid[0]) {
+ debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", srec->uid[F], srec->uid[N], srec->tuid );
+ for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) {
+ if (tmsg->status & M_DEAD)
+ continue;
+ if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) {
+ diag = (tmsg == ntmsg) ? "adjacently" : "after gap";
+ goto mfound;
+ }
+ }
+ for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) {
+ if (tmsg->status & M_DEAD)
+ continue;
+ if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) {
+ diag = "after reset";
+ goto mfound;
+ }
+ }
+ JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID lost" );
+ // Note: status remains S_PENDING.
+ srec->tuid[0] = 0;
+ num_lost++;
+ continue;
+ mfound:
+ tmsg->srec = srec;
+ srec->msg[t] = tmsg;
+ ntmsg = tmsg->next;
+ ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag );
+ }
+ }
+ return num_lost;
+}
+
+sync_rec_t *
+upgrade_srec( sync_vars_t *svars, sync_rec_t *srec )
+{
+ // Create an entry and append it to the current one.
+ sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) );
+ nsrec->next = srec->next;
+ srec->next = nsrec;
+ if (svars->srecadd == &srec->next)
+ svars->srecadd = &nsrec->next;
+ // Move the placeholder to the new entry.
+ int t = (srec->status & S_DUMMY(F)) ? F : N;
+ nsrec->uid[t] = srec->uid[t];
+ srec->uid[t] = 0;
+ if (srec->msg[t]) { // NULL during journal replay; is assigned later.
+ nsrec->msg[t] = srec->msg[t];
+ nsrec->msg[t]->srec = nsrec;
+ srec->msg[t] = NULL;
+ }
+ // Mark the original entry for upgrade.
+ srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING;
+ srec->wstate |= W_UPGRADE;
+ // Mark the placeholder for nuking.
+ nsrec->wstate = W_PURGE;
+ nsrec->aflags[t] = F_DELETED;
+ return nsrec;
+}