Skip to content

Commit

Permalink
Use atomic row counter instead of regular variable + spinlock
Browse files Browse the repository at this point in the history
  • Loading branch information
zilder authored and rapimo committed Jul 2, 2018
1 parent bc37977 commit 39b0d46
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
31 changes: 11 additions & 20 deletions src/kafka_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,11 @@ kafkaBeginForeignScan(ForeignScanState *node, int eflags)

/* we we get a parallel scan_data_desc will point to a shared mem segment by InitializeDSMForeignScan */
festate->scan_data_desc = (KafkaScanDataDesc *) palloc0(sizeof(KafkaScanDataDesc));
#ifdef DO_PARALLEL
pg_atomic_init_u32(&festate->scan_data_desc->next_scanp, 0);
#else
festate->scan_data_desc->next_scanp = 0;
festate->scan_data_desc->is_parallel = false;

#endif
festate->kafka_options = kafka_options;
festate->parse_options = parse_options;

Expand Down Expand Up @@ -914,15 +916,13 @@ next_work(KafkaScanPData *scan_p, KafkaScanDataDesc *scand)

if (scand == NULL)
return -1;

#ifdef DO_PARALLEL
if (scand->is_parallel)
SpinLockAcquire(&scand->ps_mutex);
#endif
next = pg_atomic_fetch_add_u32(&scand->next_scanp, 1);
#else
next = scand->next_scanp++;
#ifdef DO_PARALLEL
if (scand->is_parallel)
SpinLockRelease(&scand->ps_mutex);
#endif

if (next >= scan_p->len)
return -1;

Expand Down Expand Up @@ -1375,9 +1375,9 @@ kafkaInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, voi
{
KafkaScanDataDesc * scand = (KafkaScanDataDesc *) coordinate;
KafkaFdwExecutionState *festate = (KafkaFdwExecutionState *) node->fdw_state;

scand->ps_relid = RelationGetRelid(node->ss.ss_currentRelation);
scand->next_scanp = 0;
SpinLockInit(&scand->ps_mutex);
pg_atomic_write_u32(&scand->next_scanp, 0);
festate->scan_data_desc = scand;
}

Expand All @@ -1386,23 +1386,14 @@ kafkaReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, v
{
KafkaScanDataDesc *scand = (KafkaScanDataDesc *) coordinate;

/*
* It shouldn't be necessary to acquire the mutex here, but we do it
* anyway, just to be tidy.
*/
SpinLockAcquire(&scand->ps_mutex);

scand->next_scanp = 0;

SpinLockRelease(&scand->ps_mutex);
pg_atomic_write_u32(&scand->next_scanp, 0);
}

static void
kafkaInitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, void *coordinate)
{
KafkaScanDataDesc * scand = (KafkaScanDataDesc *) coordinate;
KafkaFdwExecutionState *festate = (KafkaFdwExecutionState *) node->fdw_state;
scand->is_parallel = true;
festate->scan_data_desc = scand;
}

Expand Down
10 changes: 6 additions & 4 deletions src/kafka_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ typedef struct ParseOptions
/* scan koordination */
typedef struct KafkaScanDataDesc
{
Oid ps_relid; /* OID of relation to scan */
slock_t ps_mutex; /* mutual exclusion for next_scanp */
int next_scanp; /* next scanp to fetch */
bool is_parallel;
Oid ps_relid; /* OID of relation to scan */
#ifdef DO_PARALLEL
pg_atomic_uint32 next_scanp; /* next scanp to fetch */
#else
int next_scanp;
#endif
} KafkaScanDataDesc;

/*
Expand Down

0 comments on commit 39b0d46

Please sign in to comment.