[exim-cvs] Transports: pass back next id for continued-trans…

Top Page
Delete this message
Reply to this message
Author: Exim Git Commits Mailing List
Date:  
To: exim-cvs
Subject: [exim-cvs] Transports: pass back next id for continued-transport
Gitweb: https://git.exim.org/exim.git/commitdiff/be5901583f97a754fe1b66d7f726107ec22636ad
Commit:     be5901583f97a754fe1b66d7f726107ec22636ad
Parent:     a7ea53c5ccf6c81c4716f17428646d0f55f1bed3
Author:     Jeremy Harris <jgh146exb@???>
AuthorDate: Mon Jun 17 15:47:20 2024 +0100
Committer:  Jeremy Harris <jgh146exb@???>
CommitDate: Mon Jun 17 19:33:17 2024 +0100


    Transports: pass back next id for continued-transport
---
 doc/doc-txt/ChangeLog     |  8 +++++
 src/src/deliver.c         | 85 ++++++++++++++++++++++++++++++++---------------
 src/src/globals.c         |  1 +
 src/src/globals.h         |  1 +
 src/src/transport.c       | 66 ++++++++++++++++++------------------
 src/src/transports/smtp.c | 64 ++++++++++++++++++++++-------------
 6 files changed, 141 insertions(+), 84 deletions(-)


diff --git a/doc/doc-txt/ChangeLog b/doc/doc-txt/ChangeLog
index f154430f3..dc1a02c83 100644
--- a/doc/doc-txt/ChangeLog
+++ b/doc/doc-txt/ChangeLog
@@ -2,6 +2,14 @@ This document describes *changes* to previous versions, that might
affect Exim's operation, with an unchanged configuration file. For new
options, and new features, see the NewStuff file next to this ChangeLog.

+Since version 4.98
+------------------
+
+JH/01 Use fewer forks & execs for sending many messages to a single host.
+      By passing back the next message-id from the transport to the delivery
+      process, we can loop there.  A two-phase queue run will benefit,
+      particularly for mailinglist and smarthost cases.
+
 Exim version 4.98
 -----------------


diff --git a/src/src/deliver.c b/src/src/deliver.c
index eadc96d22..d35c20260 100644
--- a/src/src/deliver.c
+++ b/src/src/deliver.c
@@ -3684,20 +3684,34 @@ while (!done)
       while (*ptr++) ;
       break;


-    /* Z marks the logical end of the data. It is followed by '0' if
+    /* Z0 marks the logical end of the data. It is followed by '0' if
     continue_transport was NULL at the end of transporting, otherwise '1'.
     We need to know when it becomes NULL during a delivery down a passed SMTP
     channel so that we don't try to pass anything more down it. Of course, for
-    most normal messages it will remain NULL all the time. */
+    most normal messages it will remain NULL all the time.
+
+    Z1 is a suggested message_id to handle next, used during a
+    continued-transport sequence. */


     case 'Z':
-      if (*ptr == '0')
+      switch (*subid)
     {
-    continue_transport = NULL;
-    continue_hostname = NULL;
+    case '0':
+      if (*ptr == '0')
+        {
+        continue_transport = NULL;
+        continue_hostname = NULL;
+        }
+      done = TRUE;
+      DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
+      break;
+    case '1':
+      if (continue_hostname)
+        Ustrncpy(continue_next_id, ptr, MESSAGE_ID_LENGTH);
+      DEBUG(D_deliver) debug_printf("continue_next_id: %s%s\n",
+        continue_next_id, continue_hostname ? "" : " (ignored)");
+      break;
     }
-      done = TRUE;
-      DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
       break;


     /* Anything else is a disaster. */
@@ -4775,7 +4789,11 @@ all pipes, so I do not see a reason to use non-blocking IO here
     is flagged by an identifying byte, and is then in a fixed format (with
     strings terminated by zeros), and there is a final terminator at the
     end. The host information and retry information is all attached to
-    the first address, so that gets sent at the start. */
+    the first address, so that gets sent at the start.
+
+    Result item tags:
+      A C D H I K L P R S T X Z
+    */


     /* Host unusability information: for most success cases this will
     be null. */
@@ -4784,7 +4802,7 @@ all pipes, so I do not see a reason to use non-blocking IO here
       {
       if (!h->address || h->status < hstatus_unusable) continue;
       sprintf(CS big_buffer, "%c%c%s", h->status, h->why, h->address);
-      rmt_dlv_checked_write(fd, 'H', '0', big_buffer, Ustrlen(big_buffer+2) + 3);
+      rmt_dlv_checked_write(fd, 'H','0', big_buffer, Ustrlen(big_buffer+2) + 3);
       }


     /* The number of bytes written. This is the same for each address. Even
@@ -5017,8 +5035,12 @@ all pipes, so I do not see a reason to use non-blocking IO here
       rmt_dlv_checked_write(fd, 'I', '0', big_buffer, ptr - big_buffer);
       }


+    /* Continuation message-id */
+    if (*continue_next_id)
+      rmt_dlv_checked_write(fd, 'Z', '1', continue_next_id, MESSAGE_ID_LENGTH);
+
     /* Add termination flag, close the pipe, and that's it. The character
-    after 'Z' indicates whether continue_transport is now NULL or not.
+    after "Z0" indicates whether continue_transport is now NULL or not.
     A change from non-NULL to NULL indicates a problem with a continuing
     connection. */


@@ -5501,16 +5523,14 @@ Returns:      nothing
 */


 static void
-do_duplicate_check(address_item **anchor)
+do_duplicate_check(address_item ** anchor)
 {
-address_item *addr;
+address_item * addr;
 while ((addr = *anchor))
   {
-  tree_node *tnode;
+  tree_node * tnode;
   if (testflag(addr, af_pfr))
-    {
-    anchor = &(addr->next);
-    }
+    anchor = &addr->next;
   else if ((tnode = tree_search(tree_duplicates, addr->unique)))
     {
     DEBUG(D_deliver|D_route)
@@ -5523,7 +5543,7 @@ while ((addr = *anchor))
   else
     {
     tree_add_duplicate(addr->unique, addr);
-    anchor = &(addr->next);
+    anchor = &addr->next;
     }
   }
 }
@@ -6433,16 +6453,19 @@ Returns:      When the global variable mua_wrapper is FALSE:
 int
 deliver_message(const uschar * id, BOOL forced, BOOL give_up)
 {
-int i, rc;
-int final_yield = DELIVER_ATTEMPTED_NORMAL;
-time_t now = time(NULL);
-address_item *addr_last = NULL;
-uschar *filter_message = NULL;
-int process_recipients = RECIP_ACCEPT;
-open_db dbblock;
-open_db *dbm_file;
+int i, rc, final_yield, process_recipients;
+time_t now;
+address_item * addr_last;
+uschar * filter_message, * info;
+open_db dbblock, * dbm_file;
 extern int acl_where;
-uschar *info;
+CONTINUED_ID:
+
+final_yield = DELIVER_ATTEMPTED_NORMAL;
+now = time(NULL);
+addr_last = NULL;
+filter_message = NULL;
+process_recipients = RECIP_ACCEPT;


 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"delivery start");    /* testcase 0022, 2100 */
@@ -8626,6 +8649,16 @@ DEBUG(D_deliver) debug_printf("end delivery of %s\n", id);
 report_time_since(&timestamp_startup, US"delivery end"); /* testcase 0005 */
 #endif


+/* If the transport suggested another message to deliver, go round again. */
+
+if (final_yield == DELIVER_ATTEMPTED_NORMAL && *continue_next_id)
+  {
+  tree_duplicates = NULL;    /* discard dups info from old message */
+  id = string_copyn(continue_next_id, MESSAGE_ID_LENGTH);
+  continue_next_id[0] = '\0';
+  goto CONTINUED_ID;
+  }
+
 /* It is unlikely that there will be any cached resources, since they are
 released after routing, and in the delivery subprocesses. However, it's
 possible for an expansion for something afterwards (for example,
diff --git a/src/src/globals.c b/src/src/globals.c
index d51644e05..0f9d5b54f 100644
--- a/src/src/globals.c
+++ b/src/src/globals.c
@@ -746,6 +746,7 @@ BOOL    continue_proxy_dane    = FALSE;
 uschar *continue_proxy_sni     = NULL;
 uschar *continue_hostname      = NULL;
 uschar *continue_host_address  = NULL;
+uschar  continue_next_id[MESSAGE_ID_LENGTH +1] = {[0]='\0'};
 int     continue_sequence      = 1;
 uschar *continue_transport     = NULL;
 #ifndef DISABLE_ESMTP_LIMITS
diff --git a/src/src/globals.h b/src/src/globals.h
index dc9d384db..a82d529c0 100644
--- a/src/src/globals.h
+++ b/src/src/globals.h
@@ -450,6 +450,7 @@ extern BOOL    continue_proxy_dane;    /* proxied conn is DANE */
 extern uschar *continue_proxy_sni;     /* proxied conn SNI */
 extern uschar *continue_hostname;      /* Host for continued delivery */
 extern uschar *continue_host_address;  /* IP address for ditto */
+extern uschar  continue_next_id[];     /* Next message_id from hintsdb */
 extern int     continue_sequence;      /* Sequence num for continued delivery */
 extern uschar *continue_transport;     /* Transport for continued delivery */
 #ifndef DISABLE_ESMTP_LIMITS
diff --git a/src/src/transport.c b/src/src/transport.c
index 84397e9cd..741ffd454 100644
--- a/src/src/transport.c
+++ b/src/src/transport.c
@@ -1499,9 +1499,8 @@ Returns:    nothing
 void
 transport_update_waiting(host_item * hostlist, uschar * tpname)
 {
-const uschar *prevname = US"";
-open_db dbblock;
-open_db *dbm_file;
+const uschar * prevname = US"";
+open_db dbblock, * dbp;


if (!is_new_message_id(message_id))
{
@@ -1514,7 +1513,7 @@ DEBUG(D_transport) debug_printf("updating wait-%s database\n", tpname);

/* Open the database for this transport */

-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", tpname),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", tpname),
               O_RDWR, &dbblock, TRUE, TRUE)))
   return;


@@ -1536,7 +1535,7 @@ for (host_item * host = hostlist; host; host = host->next)

/* Look up the host record; if there isn't one, make an empty one. */

-  if (!(host_record = dbfn_read(dbm_file, host->name)))
+  if (!(host_record = dbfn_read(dbp, host->name)))
     {
     host_record = store_get(sizeof(dbdata_wait) + MESSAGE_ID_LENGTH, GET_UNTAINTED);
     host_record->count = host_record->sequence = 0;
@@ -1560,9 +1559,9 @@ for (host_item * host = hostlist; host; host = host->next)
     debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
       " hints DB; deleting records for %s\n", tpname, host->name);


-      (void) dbfn_delete(dbm_file, host->name);
+      (void) dbfn_delete(dbp, host->name);
       for (int i = host_record->sequence - 1; i >= 0; i--)
-    (void) dbfn_delete(dbm_file,
+    (void) dbfn_delete(dbp,
             (sprintf(CS buffer, "%.200s:%d", host->name, i), buffer));


       host_record->count = host_record->sequence = 0;
@@ -1579,7 +1578,7 @@ for (host_item * host = hostlist; host; host = host->next)
     {
     dbdata_wait *cont;
     sprintf(CS buffer, "%.200s:%d", host->name, i);
-    if ((cont = dbfn_read(dbm_file, buffer)))
+    if ((cont = dbfn_read(dbp, buffer)))
       {
       int clen = cont->count * MESSAGE_ID_LENGTH;
       for (uschar * s = cont->text; s < cont->text + clen; s += MESSAGE_ID_LENGTH)
@@ -1605,7 +1604,7 @@ for (host_item * host = hostlist; host; host = host->next)
   if (host_record->count >= WAIT_NAME_MAX)
     {
     sprintf(CS buffer, "%.200s:%d", host->name, host_record->sequence);
-    dbfn_write(dbm_file, buffer, host_record, sizeof(dbdata_wait) + host_length);
+    dbfn_write(dbp, buffer, host_record, sizeof(dbdata_wait) + host_length);
 #ifndef DISABLE_QUEUE_RAMP
     if (f.queue_2stage && queue_fast_ramp && !queue_run_in_order)
       queue_notify_daemon(message_id);
@@ -1634,14 +1633,14 @@ for (host_item * host = hostlist; host; host = host->next)


/* Update the database */

-  dbfn_write(dbm_file, host->name, host_record, sizeof(dbdata_wait) + host_length);
+  dbfn_write(dbp, host->name, host_record, sizeof(dbdata_wait) + host_length);
   DEBUG(D_transport) debug_printf("added %.*s to queue for %s\n",
                   MESSAGE_ID_LENGTH, message_id, host->name);
   }


/* All now done */

-dbfn_close(dbm_file);
+dbfn_close(dbp);
}


@@ -1688,8 +1687,7 @@ transport_check_waiting(const uschar * transport_name, const uschar * hostname,
{
dbdata_wait * host_record;
int host_length;
-open_db dbblock;
-open_db * dbm_file;
+open_db dbblock, * dbp;

 int         i;
 struct stat statbuf;
@@ -1715,17 +1713,16 @@ if (local_message_max > 0 && continue_sequence >= local_message_max)


/* Open the waiting information database. */

-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", transport_name),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", transport_name),
               O_RDWR, &dbblock, TRUE, TRUE)))
   goto retfalse;


/* See if there is a record for this host; if not, there's nothing to do. */

-if (!(host_record = dbfn_read(dbm_file, hostname)))
+if (!(host_record = dbfn_read(dbp, hostname)))
{
- dbfn_close(dbm_file);
DEBUG(D_transport) debug_printf_indent("no messages waiting for %s\n", hostname);
- goto retfalse;
+ goto dbclose_false;
}

/* If the data in the record looks corrupt, just log something and
@@ -1733,10 +1730,9 @@ don't try to use it. */

 if (host_record->count > WAIT_NAME_MAX)
   {
-  dbfn_close(dbm_file);
   log_write(0, LOG_MAIN|LOG_PANIC, "smtp-wait database entry for %s has bad "
     "count=%d (max=%d)", hostname, host_record->count, WAIT_NAME_MAX);
-  goto retfalse;
+  goto dbclose_false;
   }


 /* Scan the message ids in the record from the end towards the beginning,
@@ -1775,12 +1771,11 @@ while (1)
       DEBUG(D_hints_lookup)
     debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
       " hints DB; deleting records for %s\n", transport_name, hostname);
-      (void) dbfn_delete(dbm_file, hostname);
+      (void) dbfn_delete(dbp, hostname);
       for (int i = host_record->sequence - 1; i >= 0; i--)
-    (void) dbfn_delete(dbm_file,
+    (void) dbfn_delete(dbp,
             (sprintf(CS buffer, "%.200s:%d", hostname, i), buffer));
-      dbfn_close(dbm_file);
-      goto retfalse;
+      goto dbclose_false;
       }
     msgq[i].bKeep = TRUE;


@@ -1860,20 +1855,20 @@ while (1)
     for (int i = host_record->sequence - 1; i >= 0 && !newr; i--)
       {
       sprintf(CS buffer, "%.200s:%d", hostname, i);
-      newr = dbfn_read(dbm_file, buffer);
+      newr = dbfn_read(dbp, buffer);
       }


     /* If no continuation, delete the current and break the loop */


     if (!newr)
       {
-      dbfn_delete(dbm_file, hostname);
+      dbfn_delete(dbp, hostname);
       break;
       }


     /* Else replace the current with the continuation */


-    dbfn_delete(dbm_file, buffer);
+    dbfn_delete(dbp, buffer);
     host_record = newr;
     host_length = host_record->count * MESSAGE_ID_LENGTH;


@@ -1889,9 +1884,8 @@ while (1)

   if (host_length <= 0)
     {
-    dbfn_close(dbm_file);
     DEBUG(D_transport) debug_printf_indent("waiting messages already delivered\n");
-    goto retfalse;
+    goto dbclose_false;
     }


   /* we were not able to find an acceptable message, nor was there a
@@ -1901,8 +1895,7 @@ while (1)
   if (!bContinuation)
     {
     Ustrcpy(new_message_id, message_id);
-    dbfn_close(dbm_file);
-    goto retfalse;
+    goto dbclose_false;
     }
   }        /* we need to process a continuation record */


@@ -1914,10 +1907,11 @@ record if required, close the database, and return TRUE. */
if (host_length > 0)
{
host_record->count = host_length/MESSAGE_ID_LENGTH;
- dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
+ dbfn_write(dbp, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
}

-dbfn_close(dbm_file);
+dbfn_close(dbp);
+
DEBUG(D_transport)
{
acl_level--;
@@ -1925,9 +1919,13 @@ DEBUG(D_transport)
}
return TRUE;

+dbclose_false:
+  dbfn_close(dbp);
+
 retfalse:
-DEBUG(D_transport) {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
-return FALSE;
+  DEBUG(D_transport)
+    {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
+  return FALSE;
 }


/*************************************************
diff --git a/src/src/transports/smtp.c b/src/src/transports/smtp.c
index a5caf3de6..8e4480e12 100644
--- a/src/src/transports/smtp.c
+++ b/src/src/transports/smtp.c
@@ -3800,7 +3800,6 @@ int save_errno;
int rc;

 uschar *message = NULL;
-uschar new_message_id[MESSAGE_ID_LENGTH + 1];
 smtp_context * sx = store_get(sizeof(*sx), GET_TAINTED);    /* tainted, for the data buffers */
 BOOL pass_message = FALSE;
 #ifndef DISABLE_ESMTP_LIMITS
@@ -3809,9 +3808,10 @@ BOOL mail_limit = FALSE;
 #ifdef SUPPORT_DANE
 BOOL dane_held;
 #endif
-BOOL tcw_done = FALSE, tcw = FALSE;
+BOOL tcw_done = FALSE, tcw = FALSE, passback_tcw = FALSE;


*message_defer = FALSE;
+continue_next_id[0] = '\0';

 memset(sx, 0, sizeof(*sx));
 sx->addrlist = addrlist;
@@ -4132,7 +4132,7 @@ else
         &&
 #endif
            transport_check_waiting(tblock->name, host->name,
-             tblock->connection_max_messages, new_message_id,
+             tblock->connection_max_messages, continue_next_id,
          (oicf)smtp_are_same_identities, (void*)&t_compare);
     if (!tcw)
       {
@@ -4699,9 +4699,9 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
     smtp_compare_t t_compare =
       {.tblock = tblock, .current_sender_address = sender_address};


-    if (  sx->first_addr            /* more addrs for this message */
-       || f.continue_more            /* more addrs for continued-host */
-       || tcw_done && tcw            /* more messages for host */
+    if (  sx->first_addr        /* more addrs for this message */
+       || f.continue_more        /* more addrs for continued-host */
+       || tcw_done && tcw        /* more messages for host */
        || (
 #ifndef DISABLE_TLS
          (  tls_out.active.sock < 0  &&  !continue_proxy_cipher
@@ -4710,7 +4710,7 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
       &&
 #endif
          transport_check_waiting(tblock->name, host->name,
-           sx->max_mail, new_message_id,
+           sx->max_mail, continue_next_id,
            (oicf)smtp_are_same_identities, (void*)&t_compare)
        )  )
       {
@@ -4761,6 +4761,20 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
       goto SEND_MESSAGE;
       }


+    /* If there is a next-message-id from the wait-transport hintsdb,
+    pretend caller said it has further message for us.  Note that we lose
+    the TLS session (below), and that our caller will pass back the id to
+    the delivery process.  If not, remember to later cancel the
+    next-message-id so that the transport-caller code (in deliver.c) does
+    not report it back up the pipe to the delivery process.
+    XXX It would be feasible to also report the other continue_* with the
+    _id - taking out the exec for the first continued-transport. But the
+    actual conn, and it's fd, is a problem. Maybe replace the transport
+    pipe with a unix-domain socket? */
+
+    if (!f.continue_more && continue_hostname && *continue_next_id)
+      f.continue_more = passback_tcw = TRUE;
+
     /* Unless caller said it already has more messages listed for this host,
     pass the connection on to a new Exim process (below, the call to
     transport_pass_socket).  If the caller has more ready, just return with
@@ -4780,17 +4794,17 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
         been used, which we do under TLSv1.3 for the gsasl SCRAM*PLUS methods.
         But we were always doing it anyway. */


-      tls_close(sx->cctx.tls_ctx,
-        sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
-      sx->send_tlsclose = FALSE;
-      sx->cctx.tls_ctx = NULL;
-      tls_out.active.sock = -1;
-      smtp_peer_options = smtp_peer_options_wrap;
-      sx->ok = !sx->smtps
-        && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
-        >= 0
-        && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
-                      '2', ob->command_timeout);
+        tls_close(sx->cctx.tls_ctx,
+          sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
+        sx->send_tlsclose = FALSE;
+        sx->cctx.tls_ctx = NULL;
+        tls_out.active.sock = -1;
+        smtp_peer_options = smtp_peer_options_wrap;
+        sx->ok = !sx->smtps
+          && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
+          >= 0
+          && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
+                    '2', ob->command_timeout);


         if (sx->ok && f.continue_more)
           goto TIDYUP;        /* More addresses for another run */
@@ -4822,7 +4836,7 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
   propagate it from the initial
   */
     if (sx->ok && transport_pass_socket(tblock->name, host->name,
-          host->address, new_message_id, socket_fd
+          host->address, continue_next_id, socket_fd
 #ifndef DISABLE_ESMTP_LIMITS
           , sx->peer_limit_mail, sx->peer_limit_rcpt, sx->peer_limit_rcptdom
 #endif
@@ -5014,15 +5028,17 @@ if (mail_limit && sx->first_addr)
   }
 #endif


-return yield;
+OUT:
+ if (!passback_tcw) continue_next_id[0] = '\0';
+ return yield;

 TIDYUP:
 #ifdef SUPPORT_DANE
-if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
-  if (a->transport_return == DANE)
-    a->transport_return = PENDING_DEFER;
+  if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
+    if (a->transport_return == DANE)
+      a->transport_return = PENDING_DEFER;
 #endif
-return yield;
+  goto OUT;
 }




--
## subscription configuration (requires account):
## https://lists.exim.org/mailman3/postorius/lists/exim-cvs.lists.exim.org/
## unsubscribe (doesn't require an account):
## exim-cvs-unsubscribe@???
## Exim details at http://www.exim.org/
## Please use the Wiki with this list - http://wiki.exim.org/