1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
  
     | 
    
      /****************************************************************
 *								*
 * Copyright (c) 2016-2022 Fidelity National Information	*
 * Services, Inc. and/or its subsidiaries. All rights reserved.	*
 *								*
 *	This source code contains the intellectual property	*
 *	of its copyright holder(s), and is made available	*
 *	under a license.  If you do not know the terms of	*
 *	the license, please stop and do not read further.	*
 *								*
 ****************************************************************/
#include "mdef.h"
#include <error.h>
#include "gdsroot.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "wcs_sleep.h"
#include "filestruct.h"
#include "gtmio.h"
#include "wcs_wt.h"
#include "jnl.h"
#include "sleep_cnt.h"
#define SMALLNUMCRS	128 /* if a small number is requested use a local rather than the heap */
/* This function waits for "nbuffs" dirty buffers to be moved to the free queue (from the active or wip
 * queue) since function entry. If the active and wip queue become empty, it returns even if the "nbuffs"
 * target is not reached. It uses "cnl->wcs_buffs_freed" to help with this determination. Note that
 * "wcs_buffs_freed" can only be updated by "wcs_wtfini" (which holds crit) so we can expect to see a
 * monotonically increasing value of this shared counter.
 */
int wcs_wtstart_fini(gd_region *reg, int nbuffs, cache_rec_ptr_t cr2flush)
{
	int			numwritesneeded, numwritesissued;
	wtstart_cr_list_t	cr_list;
	cache_que_head_ptr_t 	crq, crwipq;
	cache_rec_ptr_t 	*listcrs, smalllistcrs[SMALLNUMCRS], *heaplistcrs;
	boolean_t		was_crit, wtstartalreadycalled = FALSE, proc_check, tried_to_flush = FALSE;
	gtm_uint64_t 		targetfreedlevel, basefreedlevel, prewtfinifreedlevel, lcl_buffs_freed;
	int4			err_status = 0, num_sleeps = 0;
	sgmnt_addrs             *csa;
	node_local_ptr_t	cnl;
	sgmnt_data_ptr_t	csd;
	jnl_private_control     *jpc;
	cache_rec_ptr_t		older_twin, cr2use;
#ifdef DEBUG
	gtm_uint64_t		dbg_wcs_buffs_freed, prev_dbg_wcs_buffs_freed = 0;
#endif
	csa = &FILE_INFO(reg)->s_addrs;
	csd = csa->hdr;
	cnl = csa->nl;
	crwipq = &csa->acc_meth.bg.cache_state->cacheq_wip;
	crq = &csa->acc_meth.bg.cache_state->cacheq_active;
	heaplistcrs = NULL;
	if (0 >= nbuffs) /* if number not specified, use default (same as "wcs_wtstart") */
		nbuffs = csd->n_wrt_per_flu;
	basefreedlevel = cnl->wcs_buffs_freed;
        /* If we are asked to flush a specific cr AND it is the younger twin, check if the older twin needs
	 * to have its io initiated and reaped first.
	 */
	older_twin = (cache_rec_ptr_t) NULL;
	assert((NULL == cr2flush) || csa->now_crit);
	assert(csd->asyncio);
	if (cr2flush && cr2flush->twin && cr2flush->bt_index)
	{
		older_twin = (cache_rec_ptr_t) GDS_ANY_REL2ABS(csa, cr2flush->twin);
		if (older_twin->dirty)
			nbuffs++; /* We need to flush the older sibling too. */
	}
	targetfreedlevel = basefreedlevel + nbuffs;
	cr_list.listsize = nbuffs;
	if (SMALLNUMCRS < nbuffs)
	{
		heaplistcrs = (cache_rec_ptr_t *)malloc(SIZEOF(cache_rec_ptr_t) * nbuffs);
		cr_list.listcrs = heaplistcrs;
	} else /* if small number of buffers use the local */
		cr_list.listcrs = smalllistcrs;
	while (((lcl_buffs_freed = cnl->wcs_buffs_freed) < targetfreedlevel)
			&& (crq->fl || crwipq->fl))
	{ /* while we have not reached our free target and there are still things to free */
#		ifdef DEBUG
		dbg_wcs_buffs_freed = cnl->wcs_buffs_freed;
		assert(dbg_wcs_buffs_freed >= prev_dbg_wcs_buffs_freed);
		prev_dbg_wcs_buffs_freed = dbg_wcs_buffs_freed;
#		endif
		numwritesissued = 0;
		numwritesneeded = targetfreedlevel - lcl_buffs_freed;
		assert(numwritesneeded <= nbuffs);
		while (0 < numwritesneeded)
		{ /* issue the requested number of writes unless you run out of dirty buffers that don't have outstanding writes  */
#			ifdef DEBUG
			dbg_wcs_buffs_freed = cnl->wcs_buffs_freed;
			assert(dbg_wcs_buffs_freed >= prev_dbg_wcs_buffs_freed);
			prev_dbg_wcs_buffs_freed = dbg_wcs_buffs_freed;
#			endif
			assert(numwritesneeded <= nbuffs);
			/* wcs_wtstart() will initiate up to numwritesneeded writes (limited by the number of dirty buffers
			 * available and buffers being blocked from having their writes initiated)
			 */
			/* If we are asked to flush a specific cr AND it is the younger twin, check if the older twin needs
			 * to have its io initiated (via wcs_wtstart()) (possible if its write errored out and it transitioned
			 * from the wip queue back to the active queue). If it does call wcs_wtstart with the older twin as it
			 * must be flushed first before we can attempt to flush the newer twin.
			 */
			if (older_twin)
		 	{
				if (older_twin->dirty && !older_twin->epid)
				{
					wtstartalreadycalled = TRUE; /* this variable indicates if we ever called wcs_wtstart */
					tried_to_flush = TRUE; /* this variable indicates if we called it in this iteration */
					err_status = wcs_wtstart(reg, numwritesneeded, &cr_list, older_twin);
				} else
				{
					tried_to_flush = FALSE; /* remember that we did not try to initiate any ios */
					cr_list.numcrs = 0; /* since wcs_wtstart was not called indicate 0 ios were initiated. */
					assert(!err_status); /* we should only get here if err_status is 0 */
					err_status = 0; /* for PRO ensure it is set to 0 */
				}
			} else
			{
				wtstartalreadycalled = TRUE;
				tried_to_flush = TRUE;
				err_status = wcs_wtstart(reg, numwritesneeded, &cr_list, cr2flush);
			}
			if (err_status || cnl->wc_blocked) /* if we are blocked get out to allow cache recovery */
				break;
			if (0 != cr_list.numcrs)
			{ /* If we were able to initiate some writes wait for their statuses to complete */
				wcs_wtfini_nocrit(reg, &cr_list);
				numwritesissued += cr_list.numcrs;
				num_sleeps = 0; /* making progress so reset the clock */
				if (numwritesissued >= numwritesneeded)
					break; /* enough i/o statuses are back so lets reap them */
			} else
			 	/* not able to initiate any i/os:
				 * maybe out of i/o slots, buffer(s) may be stuck (jnl write, twin, bt_put)
				 * or we are out of dirty buffers to initiate writes on but there are crs
				 * in the WIP queue or we have previously issued the io for an older twin
				 * and now need to wait for the io to complete (or tried_to_flush=FALSE case).
				 */
				break;
			/* we waited in wcs_wtfini_nocrit() above so check cnl->wcs_buffs_freed to see if any other processes
			 *  helped increase the number of freed buffers.
			 */
			numwritesneeded = targetfreedlevel - cnl->wcs_buffs_freed;
		}
		/* if we got an error from wcs_wtstart() or wcs_wtfini() or we were asked to flush a specific buffer
		 * and it is no longer dirty or cache needs to be verified we are done.
		 */
		if (err_status || (cr2flush && !cr2flush->dirty) || cnl->wc_blocked)
			break;
		/* have i/o statuses on i/os we issued so call wcs_wtfini() to reap them all at once;
		 * other's i/os may also be reaped
		 */
		prewtfinifreedlevel = cnl->wcs_buffs_freed; /* get a base line to see if wtfini is making progress */
		was_crit = csa->now_crit;
		if (!was_crit)
			grab_crit(reg, WS_27);
		/* Reap our i/os (and maybe some others).
		 * If we tried to flush, were unable to, and there is something on the wip queue: a process with an outstanding
		 * i/o may have died and is blocking our ability to initiate i/os; ask wcs_wtfini() to see if all of the processes
		 * that have outstanding writes are alive.
		 */
		proc_check = tried_to_flush && (0 == cr_list.numcrs) && crwipq->fl
			? CHECK_IS_PROC_ALIVE_TRUE : CHECK_IS_PROC_ALIVE_FALSE;
		/* If we are waiting on the completion of the io of an older twin use its cr on the call to wcs_wtfini */
		cr2use = (older_twin && older_twin->dirty & older_twin->epid) ? older_twin : cr2flush;
		err_status = wcs_wtfini(reg, proc_check, cr2use);
                if (older_twin && !older_twin->dirty)
                        /* the older twin has been flushed so we are done with it. */
                        older_twin = (cache_rec_ptr_t) NULL;
		if (!was_crit)
			rel_crit(reg);
		if (prewtfinifreedlevel == cnl->wcs_buffs_freed)
		{
			wcs_sleep(1);	/* wtfini did not make any progress so wait a while.
					 * Note: The sleep time of 1 msec here is factored into the calculation
					 * of MAX_WTSTART_FINI_SLEEPS (it has a MAXSLPTIME multiplicative factor).
					 * Any changes to the 1 parameter need corresponding changes in the macro.
					 */
			num_sleeps++;
		} else
			num_sleeps = 0; /* we are making progress so reset the clock */
		if (MAX_WTSTART_FINI_SLEEPS < num_sleeps) /* waited long enough; initiate cache recovery */
		{
			SET_TRACEABLE_VAR(cnl->wc_blocked, WC_BLOCK_RECOVER);
			break;
		}
		if (err_status)
			break;
	}
	if (!wtstartalreadycalled)
	{
        	jpc = csa->jnl;
        	assert(!JNL_ALLOWED(csd) || ( NULL != jpc));     /* if journaling is allowed, we better have non-null csa->jnl */
		/* wcs_wtstart(), which would call jnl_qio_start(), has not been called. Call jnl_qio_start() in case there are
		 * journal buffers that need to be flushed.
		 */
        	if (JNL_ENABLED(csd) && (NULL != jpc) && (NOJNL != jpc->channel))
                	jnl_qio_start(jpc);
	}
	if (NULL != heaplistcrs)
		free(heaplistcrs);
	return err_status;
}
 
     |