Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,17 @@ typedef struct splinterdb_notification {
char opaque[SPLINTERDB_NOTIFICATION_BUFSIZE];
} __attribute__((__aligned__(8))) splinterdb_notification;

typedef void (*splinterdb_notification_callback)(
splinterdb_notification *notification);

// The caller owns notification storage and must keep it alive until completion.
// After completion, call splinterdb_notification_deinit before reusing or
// destroying the notification.
// destroying the notification. If completion may happen in another process, the
// notification object must be allocated in shared memory.
void
splinterdb_notification_init_blocking(splinterdb_notification *notification);

void
splinterdb_notification_init_polling(splinterdb_notification *notification,
void *user_data);

void
splinterdb_notification_init_callback(splinterdb_notification *notification,
splinterdb_notification_callback callback,
void *user_data);

void
splinterdb_notification_deinit(splinterdb_notification *notification);

Expand All @@ -224,7 +217,8 @@ splinterdb_notification_poll(const splinterdb_notification *notification,
int *status);

int
splinterdb_notification_wait(splinterdb_notification *notification);
splinterdb_notification_wait(splinterdb *kvs,
splinterdb_notification *notification);

void *
splinterdb_notification_user_data(const splinterdb_notification *notification);
Expand Down Expand Up @@ -346,8 +340,8 @@ splinterdb_update(splinterdb *kvsb,
// min_key or max_key means the range is unbounded on that side. If
// full_leaf_compactions is true, enqueue full compactions for leaves in the
// range after flushing. Passing a NULL notification makes this fire-and-forget.
// Blocking notifications wait before this function returns; polling and
// callback notifications complete later.
// Blocking notifications wait before this function returns; polling
// notifications complete later.
int
splinterdb_optimize(splinterdb *kvs,
slice min_key,
Expand Down
64 changes: 11 additions & 53 deletions src/notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
typedef enum notification_mode {
NOTIFICATION_MODE_BLOCKING,
NOTIFICATION_MODE_POLLING,
NOTIFICATION_MODE_CALLBACK,
} notification_mode;

typedef struct notification {
notification_mode mode;
bool32 complete;
platform_status status;
void *user_data;
splinterdb_notification_callback callback;
platform_condvar cv;
notification_mode mode;
bool32 complete;
platform_status status;
void *user_data;
platform_condvar cv;
} notification;

_Static_assert(sizeof(notification) <= sizeof(splinterdb_notification),
Expand Down Expand Up @@ -56,45 +54,33 @@ platform_status_to_int(const platform_status status)
}

static void
splinterdb_notification_init_common(splinterdb_notification *note,
notification_mode mode,
splinterdb_notification_callback callback,
void *user_data)
splinterdb_notification_init_common(splinterdb_notification *note,
notification_mode mode,
void *user_data)
{
notification *n = notification_from_splinterdb(note);

n->mode = mode;
n->complete = FALSE;
n->status = STATUS_OK;
n->user_data = user_data;
n->callback = callback;

platform_status rc = platform_condvar_init(&n->cv, PROCESS_PRIVATE_HEAP_ID);
platform_status rc = platform_condvar_init(&n->cv, TRUE);
platform_assert_status_ok(rc);
}

void
splinterdb_notification_init_blocking(splinterdb_notification *note)
{
splinterdb_notification_init_common(
note, NOTIFICATION_MODE_BLOCKING, NULL, NULL);
splinterdb_notification_init_common(note, NOTIFICATION_MODE_BLOCKING, NULL);
}

void
splinterdb_notification_init_polling(splinterdb_notification *note,
void *user_data)
{
splinterdb_notification_init_common(
note, NOTIFICATION_MODE_POLLING, NULL, user_data);
}

void
splinterdb_notification_init_callback(splinterdb_notification *note,
splinterdb_notification_callback callback,
void *user_data)
{
splinterdb_notification_init_common(
note, NOTIFICATION_MODE_CALLBACK, callback, user_data);
note, NOTIFICATION_MODE_POLLING, user_data);
}

void
Expand Down Expand Up @@ -125,27 +111,6 @@ splinterdb_notification_poll(const splinterdb_notification *note, int *status)
return complete;
}

int
splinterdb_notification_wait(splinterdb_notification *note)
{
notification *n = notification_from_splinterdb(note);

platform_status rc = platform_condvar_lock(&n->cv);
platform_assert_status_ok(rc);

while (!n->complete) {
rc = platform_condvar_wait(&n->cv);
platform_assert_status_ok(rc);
}

int status = platform_status_to_int(n->status);

rc = platform_condvar_unlock(&n->cv);
platform_assert_status_ok(rc);

return status;
}

void *
splinterdb_notification_user_data(const splinterdb_notification *note)
{
Expand Down Expand Up @@ -183,16 +148,9 @@ splinterdb_notification_complete(splinterdb_notification *note,
n->status = status;
n->complete = TRUE;

splinterdb_notification_callback callback =
n->mode == NOTIFICATION_MODE_CALLBACK ? n->callback : NULL;

rc = platform_condvar_broadcast(&n->cv);
platform_assert_status_ok(rc);

rc = platform_condvar_unlock(&n->cv);
platform_assert_status_ok(rc);

if (callback != NULL) {
callback(note);
}
}
8 changes: 3 additions & 5 deletions src/platform_linux/platform_condvar.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <pthread.h>

platform_status
platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id)
platform_condvar_init(platform_condvar *cv, bool32 process_shared)
{
platform_status status;
bool32 pth_condattr_setpshared_failed = FALSE;
Expand All @@ -23,8 +23,7 @@ platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id)

// clang-format off
status.r = pthread_mutex_init(&cv->lock,
((heap_id == PROCESS_PRIVATE_HEAP_ID)
? NULL : &mattr));
(process_shared ? &mattr : NULL));
// clang-format on
debug_only int rv = pthread_mutexattr_destroy(&mattr);
debug_assert(rv == 0);
Expand All @@ -44,8 +43,7 @@ platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id)

// clang-format off
status.r = pthread_cond_init(&cv->cond,
((heap_id == PROCESS_PRIVATE_HEAP_ID)
? NULL : &cattr));
(process_shared ? &cattr : NULL));
// clang-format on

rv = pthread_condattr_destroy(&cattr);
Expand Down
3 changes: 1 addition & 2 deletions src/platform_linux/platform_condvar.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#pragma once

#include "platform_status.h"
#include "platform_heap.h"
#include <pthread.h>

typedef struct platform_condvar {
Expand All @@ -13,7 +12,7 @@ typedef struct platform_condvar {
} platform_condvar;

platform_status
platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id);
platform_condvar_init(platform_condvar *cv, bool32 process_shared);

platform_status
platform_condvar_wait(platform_condvar *cv);
Expand Down
34 changes: 33 additions & 1 deletion src/splinterdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "notification.h"
#include "shard_log.h"
#include "splinterdb_tests_private.h"
#include "task.h"
#include "platform_typed_alloc.h"
#include "platform_assert.h"
#include "platform_sleep.h"
#include "platform_units.h"
#include "platform_threads.h"
#include "poison.h"
Expand Down Expand Up @@ -88,6 +90,36 @@ splinterdb_assert_thread_registered(void)
platform_assert_status_ok(rc);
}

int
splinterdb_notification_wait(splinterdb *kvs,
splinterdb_notification *notification)
{
int rc = splinterdb_ensure_thread_registered();
if (rc != 0) {
return rc;
}

platform_assert(kvs != NULL);
platform_assert(notification != NULL);

int status = 0;
uint64 backoff = 1;

while (!splinterdb_notification_poll(notification, &status)) {
platform_status task_rc = task_perform_one(&kvs->task_sys);
if (SUCCESS(task_rc)) {
backoff = 1;
} else if (STATUS_IS_EQ(task_rc, STATUS_TIMEDOUT)) {
platform_sleep_ns(backoff);
backoff = MIN(2 * backoff, 1 << 16);
} else {
return platform_status_to_int(task_rc);
}
}

return status;
}

static void
splinterdb_config_set_defaults(splinterdb_config *cfg)
{
Expand Down Expand Up @@ -743,7 +775,7 @@ splinterdb_optimize(splinterdb *kvs,
}

if (splinterdb_notification_is_blocking(notification)) {
return splinterdb_notification_wait(notification);
return splinterdb_notification_wait(kvs, notification);
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ task_group_init(task_group *group,
platform_heap_id hid = ts->heap_id;
platform_status rc;

rc = platform_condvar_init(&group->cv, hid);
rc = platform_condvar_init(&group->cv, hid != PROCESS_PRIVATE_HEAP_ID);
if (!SUCCESS(rc)) {
return rc;
}
Expand Down
30 changes: 21 additions & 9 deletions src/trunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -5745,15 +5745,21 @@ trunk_incorporate_cleanup(trunk_context *context)
trunk_flush_cleanup(context, NULL);
}

typedef struct trunk_flush_tracker {
task_tracker tracker;
splinterdb_notification *notification;
platform_heap_id hid;
} trunk_flush_tracker;

static void
trunk_flush_tracker_done(task_tracker *tracker)
{
splinterdb_notification *notification =
(splinterdb_notification *)tracker->user_data;
trunk_flush_tracker *flush_tracker =
(trunk_flush_tracker *)tracker->user_data;
platform_status status = tracker->failed ? tracker->status : STATUS_OK;

splinterdb_notification_complete(notification, status);
platform_free(PROCESS_PRIVATE_HEAP_ID, tracker);
splinterdb_notification_complete(flush_tracker->notification, status);
platform_free(flush_tracker->hid, flush_tracker);
}

platform_status
Expand All @@ -5776,13 +5782,17 @@ trunk_optimize(trunk_context *context,
return STATUS_OK;
}

task_tracker *tracker = NULL;
trunk_flush_tracker *flush_tracker = NULL;
task_tracker *tracker = NULL;
if (notification != NULL) {
tracker = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, tracker);
if (tracker == NULL) {
flush_tracker = TYPED_MALLOC(context->hid, flush_tracker);
if (flush_tracker == NULL) {
return STATUS_NO_MEMORY;
}
task_tracker_init(tracker, trunk_flush_tracker_done, notification);
flush_tracker->notification = notification;
flush_tracker->hid = context->hid;
tracker = &flush_tracker->tracker;
task_tracker_init(tracker, trunk_flush_tracker_done, flush_tracker);
}

trunk_flush_policy policy = {
Expand All @@ -5797,7 +5807,9 @@ trunk_optimize(trunk_context *context,

platform_status rc = trunk_flush_prepare(context, 0, &policy);
if (!SUCCESS(rc)) {
platform_free(PROCESS_PRIVATE_HEAP_ID, tracker);
if (flush_tracker != NULL) {
platform_free(context->hid, flush_tracker);
}
return rc;
}

Expand Down
3 changes: 2 additions & 1 deletion tests/unit/platform_apis_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ CTEST2(platform_api, test_platform_mutex_init_destroy)
CTEST2(platform_api, test_platform_condvar_init_destroy)
{
platform_condvar cv;
platform_status rc = platform_condvar_init(&cv, data->hid);
platform_status rc =
platform_condvar_init(&cv, data->hid != PROCESS_PRIVATE_HEAP_ID);
if (STATUS_IS_EQ(rc, STATUS_NOTSUP)) {
platform_error_log(
"Platform possibly does not support"
Expand Down
Loading
Loading