diff --git a/include/splinterdb/splinterdb.h b/include/splinterdb/splinterdb.h index 229cac44..fdb35d4e 100644 --- a/include/splinterdb/splinterdb.h +++ b/include/splinterdb/splinterdb.h @@ -198,12 +198,10 @@ typedef struct splinterdb_notification { char opaque[SPLINTERDB_NOTIFICATION_BUFSIZE]; } __attribute__((__aligned__(8))) splinterdb_notification; -typedef void (*splinterdb_notification_callback)( - splinterdb_notification *notification); - // The caller owns notification storage and must keep it alive until completion. // After completion, call splinterdb_notification_deinit before reusing or -// destroying the notification. +// destroying the notification. If completion may happen in another process, the +// notification object must be allocated in shared memory. void splinterdb_notification_init_blocking(splinterdb_notification *notification); @@ -211,11 +209,6 @@ void splinterdb_notification_init_polling(splinterdb_notification *notification, void *user_data); -void -splinterdb_notification_init_callback(splinterdb_notification *notification, - splinterdb_notification_callback callback, - void *user_data); - void splinterdb_notification_deinit(splinterdb_notification *notification); @@ -224,7 +217,8 @@ splinterdb_notification_poll(const splinterdb_notification *notification, int *status); int -splinterdb_notification_wait(splinterdb_notification *notification); +splinterdb_notification_wait(splinterdb *kvs, + splinterdb_notification *notification); void * splinterdb_notification_user_data(const splinterdb_notification *notification); @@ -346,8 +340,8 @@ splinterdb_update(splinterdb *kvsb, // min_key or max_key means the range is unbounded on that side. If // full_leaf_compactions is true, enqueue full compactions for leaves in the // range after flushing. Passing a NULL notification makes this fire-and-forget. -// Blocking notifications wait before this function returns; polling and -// callback notifications complete later. +// Blocking notifications wait before this function returns; polling +// notifications complete later. int splinterdb_optimize(splinterdb *kvs, slice min_key, diff --git a/src/notification.c b/src/notification.c index cb044697..a6b2ea17 100644 --- a/src/notification.c +++ b/src/notification.c @@ -18,16 +18,14 @@ typedef enum notification_mode { NOTIFICATION_MODE_BLOCKING, NOTIFICATION_MODE_POLLING, - NOTIFICATION_MODE_CALLBACK, } notification_mode; typedef struct notification { - notification_mode mode; - bool32 complete; - platform_status status; - void *user_data; - splinterdb_notification_callback callback; - platform_condvar cv; + notification_mode mode; + bool32 complete; + platform_status status; + void *user_data; + platform_condvar cv; } notification; _Static_assert(sizeof(notification) <= sizeof(splinterdb_notification), @@ -56,10 +54,9 @@ platform_status_to_int(const platform_status status) } static void -splinterdb_notification_init_common(splinterdb_notification *note, - notification_mode mode, - splinterdb_notification_callback callback, - void *user_data) +splinterdb_notification_init_common(splinterdb_notification *note, + notification_mode mode, + void *user_data) { notification *n = notification_from_splinterdb(note); @@ -67,17 +64,15 @@ splinterdb_notification_init_common(splinterdb_notification *note, n->complete = FALSE; n->status = STATUS_OK; n->user_data = user_data; - n->callback = callback; - platform_status rc = platform_condvar_init(&n->cv, PROCESS_PRIVATE_HEAP_ID); + platform_status rc = platform_condvar_init(&n->cv, TRUE); platform_assert_status_ok(rc); } void splinterdb_notification_init_blocking(splinterdb_notification *note) { - splinterdb_notification_init_common( - note, NOTIFICATION_MODE_BLOCKING, NULL, NULL); + splinterdb_notification_init_common(note, NOTIFICATION_MODE_BLOCKING, NULL); } void @@ -85,16 +80,7 @@ splinterdb_notification_init_polling(splinterdb_notification *note, void *user_data) { splinterdb_notification_init_common( - note, NOTIFICATION_MODE_POLLING, NULL, user_data); -} - -void -splinterdb_notification_init_callback(splinterdb_notification *note, - splinterdb_notification_callback callback, - void *user_data) -{ - splinterdb_notification_init_common( - note, NOTIFICATION_MODE_CALLBACK, callback, user_data); + note, NOTIFICATION_MODE_POLLING, user_data); } void @@ -125,27 +111,6 @@ splinterdb_notification_poll(const splinterdb_notification *note, int *status) return complete; } -int -splinterdb_notification_wait(splinterdb_notification *note) -{ - notification *n = notification_from_splinterdb(note); - - platform_status rc = platform_condvar_lock(&n->cv); - platform_assert_status_ok(rc); - - while (!n->complete) { - rc = platform_condvar_wait(&n->cv); - platform_assert_status_ok(rc); - } - - int status = platform_status_to_int(n->status); - - rc = platform_condvar_unlock(&n->cv); - platform_assert_status_ok(rc); - - return status; -} - void * splinterdb_notification_user_data(const splinterdb_notification *note) { @@ -183,16 +148,9 @@ splinterdb_notification_complete(splinterdb_notification *note, n->status = status; n->complete = TRUE; - splinterdb_notification_callback callback = - n->mode == NOTIFICATION_MODE_CALLBACK ? n->callback : NULL; - rc = platform_condvar_broadcast(&n->cv); platform_assert_status_ok(rc); rc = platform_condvar_unlock(&n->cv); platform_assert_status_ok(rc); - - if (callback != NULL) { - callback(note); - } } diff --git a/src/platform_linux/platform_condvar.c b/src/platform_linux/platform_condvar.c index e9410b7a..1c0f5baa 100644 --- a/src/platform_linux/platform_condvar.c +++ b/src/platform_linux/platform_condvar.c @@ -8,7 +8,7 @@ #include platform_status -platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id) +platform_condvar_init(platform_condvar *cv, bool32 process_shared) { platform_status status; bool32 pth_condattr_setpshared_failed = FALSE; @@ -23,8 +23,7 @@ platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id) // clang-format off status.r = pthread_mutex_init(&cv->lock, - ((heap_id == PROCESS_PRIVATE_HEAP_ID) - ? NULL : &mattr)); + (process_shared ? &mattr : NULL)); // clang-format on debug_only int rv = pthread_mutexattr_destroy(&mattr); debug_assert(rv == 0); @@ -44,8 +43,7 @@ platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id) // clang-format off status.r = pthread_cond_init(&cv->cond, - ((heap_id == PROCESS_PRIVATE_HEAP_ID) - ? NULL : &cattr)); + (process_shared ? &cattr : NULL)); // clang-format on rv = pthread_condattr_destroy(&cattr); diff --git a/src/platform_linux/platform_condvar.h b/src/platform_linux/platform_condvar.h index da8376b6..073d64ab 100644 --- a/src/platform_linux/platform_condvar.h +++ b/src/platform_linux/platform_condvar.h @@ -4,7 +4,6 @@ #pragma once #include "platform_status.h" -#include "platform_heap.h" #include typedef struct platform_condvar { @@ -13,7 +12,7 @@ typedef struct platform_condvar { } platform_condvar; platform_status -platform_condvar_init(platform_condvar *cv, platform_heap_id heap_id); +platform_condvar_init(platform_condvar *cv, bool32 process_shared); platform_status platform_condvar_wait(platform_condvar *cv); diff --git a/src/splinterdb.c b/src/splinterdb.c index 581fedba..10addc51 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -22,8 +22,10 @@ #include "notification.h" #include "shard_log.h" #include "splinterdb_tests_private.h" +#include "task.h" #include "platform_typed_alloc.h" #include "platform_assert.h" +#include "platform_sleep.h" #include "platform_units.h" #include "platform_threads.h" #include "poison.h" @@ -88,6 +90,36 @@ splinterdb_assert_thread_registered(void) platform_assert_status_ok(rc); } +int +splinterdb_notification_wait(splinterdb *kvs, + splinterdb_notification *notification) +{ + int rc = splinterdb_ensure_thread_registered(); + if (rc != 0) { + return rc; + } + + platform_assert(kvs != NULL); + platform_assert(notification != NULL); + + int status = 0; + uint64 backoff = 1; + + while (!splinterdb_notification_poll(notification, &status)) { + platform_status task_rc = task_perform_one(&kvs->task_sys); + if (SUCCESS(task_rc)) { + backoff = 1; + } else if (STATUS_IS_EQ(task_rc, STATUS_TIMEDOUT)) { + platform_sleep_ns(backoff); + backoff = MIN(2 * backoff, 1 << 16); + } else { + return platform_status_to_int(task_rc); + } + } + + return status; +} + static void splinterdb_config_set_defaults(splinterdb_config *cfg) { @@ -743,7 +775,7 @@ splinterdb_optimize(splinterdb *kvs, } if (splinterdb_notification_is_blocking(notification)) { - return splinterdb_notification_wait(notification); + return splinterdb_notification_wait(kvs, notification); } return 0; diff --git a/src/task.c b/src/task.c index 97d17ca8..dc298c14 100644 --- a/src/task.c +++ b/src/task.c @@ -275,7 +275,7 @@ task_group_init(task_group *group, platform_heap_id hid = ts->heap_id; platform_status rc; - rc = platform_condvar_init(&group->cv, hid); + rc = platform_condvar_init(&group->cv, hid != PROCESS_PRIVATE_HEAP_ID); if (!SUCCESS(rc)) { return rc; } diff --git a/src/trunk.c b/src/trunk.c index a723a59c..d92e5200 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -5745,15 +5745,21 @@ trunk_incorporate_cleanup(trunk_context *context) trunk_flush_cleanup(context, NULL); } +typedef struct trunk_flush_tracker { + task_tracker tracker; + splinterdb_notification *notification; + platform_heap_id hid; +} trunk_flush_tracker; + static void trunk_flush_tracker_done(task_tracker *tracker) { - splinterdb_notification *notification = - (splinterdb_notification *)tracker->user_data; + trunk_flush_tracker *flush_tracker = + (trunk_flush_tracker *)tracker->user_data; platform_status status = tracker->failed ? tracker->status : STATUS_OK; - splinterdb_notification_complete(notification, status); - platform_free(PROCESS_PRIVATE_HEAP_ID, tracker); + splinterdb_notification_complete(flush_tracker->notification, status); + platform_free(flush_tracker->hid, flush_tracker); } platform_status @@ -5776,13 +5782,17 @@ trunk_optimize(trunk_context *context, return STATUS_OK; } - task_tracker *tracker = NULL; + trunk_flush_tracker *flush_tracker = NULL; + task_tracker *tracker = NULL; if (notification != NULL) { - tracker = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, tracker); - if (tracker == NULL) { + flush_tracker = TYPED_MALLOC(context->hid, flush_tracker); + if (flush_tracker == NULL) { return STATUS_NO_MEMORY; } - task_tracker_init(tracker, trunk_flush_tracker_done, notification); + flush_tracker->notification = notification; + flush_tracker->hid = context->hid; + tracker = &flush_tracker->tracker; + task_tracker_init(tracker, trunk_flush_tracker_done, flush_tracker); } trunk_flush_policy policy = { @@ -5797,7 +5807,9 @@ trunk_optimize(trunk_context *context, platform_status rc = trunk_flush_prepare(context, 0, &policy); if (!SUCCESS(rc)) { - platform_free(PROCESS_PRIVATE_HEAP_ID, tracker); + if (flush_tracker != NULL) { + platform_free(context->hid, flush_tracker); + } return rc; } diff --git a/tests/unit/platform_apis_test.c b/tests/unit/platform_apis_test.c index 078ec58d..7645da57 100644 --- a/tests/unit/platform_apis_test.c +++ b/tests/unit/platform_apis_test.c @@ -261,7 +261,8 @@ CTEST2(platform_api, test_platform_mutex_init_destroy) CTEST2(platform_api, test_platform_condvar_init_destroy) { platform_condvar cv; - platform_status rc = platform_condvar_init(&cv, data->hid); + platform_status rc = + platform_condvar_init(&cv, data->hid != PROCESS_PRIVATE_HEAP_ID); if (STATUS_IS_EQ(rc, STATUS_NOTSUP)) { platform_error_log( "Platform possibly does not support" diff --git a/tests/unit/splinterdb_optimize_test.c b/tests/unit/splinterdb_optimize_test.c index a72d0f64..a8587cf6 100644 --- a/tests/unit/splinterdb_optimize_test.c +++ b/tests/unit/splinterdb_optimize_test.c @@ -12,7 +12,6 @@ #include "splinterdb/default_data_config.h" #include "splinterdb/splinterdb.h" -#include "platform_sleep.h" #include "platform_threads.h" #include "splinterdb_tests_private.h" #include "unit_tests.h" @@ -28,12 +27,6 @@ static const char optimize_key_fmt[] = "key-%018u"; static const char optimize_value_fmt[] = "value-%018u-marker"; -typedef struct optimize_callback_state { - volatile uint64 callbacks; - int status; - splinterdb_notification *notification; -} optimize_callback_state; - typedef struct optimize_thread_args { splinterdb *kvsb; splinterdb_notification *notification; @@ -67,9 +60,6 @@ verify_point_lookups(splinterdb *kvsb, uint32 num_keys); static void verify_full_scan(splinterdb *kvsb, uint32 num_keys); -static void -optimize_callback(splinterdb_notification *notification); - static void * optimize_thread(void *arg); @@ -133,6 +123,30 @@ CTEST2(splinterdb_optimize, test_blocking_without_full_leaf_compactions) verify_full_scan(data->kvsb, num_keys); } +CTEST2(splinterdb_optimize, test_blocking_with_no_background_threads) +{ + const uint32 num_keys = 320; + + splinterdb_close(&data->kvsb); + data->cfg.num_memtable_bg_threads = 0; + data->cfg.num_normal_bg_threads = 0; + + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + load_key_batches(data->kvsb, num_keys, 40); + + splinterdb_notification notification; + splinterdb_notification_init_blocking(¬ification); + rc = splinterdb_optimize( + data->kvsb, NULL_SLICE, NULL_SLICE, TRUE, ¬ification); + ASSERT_EQUAL(0, rc); + splinterdb_notification_deinit(¬ification); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + CTEST2(splinterdb_optimize, test_open_reads_disk_geometry) { const uint32 num_keys = 160; @@ -174,7 +188,7 @@ CTEST2(splinterdb_optimize, test_polling_subrange) ASSERT_TRUE(&user_data == splinterdb_notification_user_data(¬ification)); int status = EINVAL; - rc = splinterdb_notification_wait(¬ification); + rc = splinterdb_notification_wait(data->kvsb, ¬ification); ASSERT_EQUAL(0, rc); ASSERT_TRUE(splinterdb_notification_poll(¬ification, &status)); ASSERT_EQUAL(0, status); @@ -185,48 +199,6 @@ CTEST2(splinterdb_optimize, test_polling_subrange) verify_full_scan(data->kvsb, num_keys); } -CTEST2(splinterdb_optimize, test_callback_completion) -{ - const uint32 num_keys = 320; - char min_key[OPTIMIZE_TEST_KEY_SIZE]; - char max_key[OPTIMIZE_TEST_KEY_SIZE]; - - load_key_batches(data->kvsb, num_keys, 40); - format_key(min_key, 0); - format_key(max_key, num_keys); - - optimize_callback_state callback_state = {0}; - splinterdb_notification notification; - splinterdb_notification_init_callback( - ¬ification, optimize_callback, &callback_state); - - int rc = splinterdb_optimize(data->kvsb, - slice_create(strlen(min_key), min_key), - slice_create(strlen(max_key), max_key), - TRUE, - ¬ification); - ASSERT_EQUAL(0, rc); - - rc = splinterdb_notification_wait(¬ification); - ASSERT_EQUAL(0, rc); - - for (uint64 i = 0; - i < 100000 && __sync_fetch_and_add(&callback_state.callbacks, 0) == 0; - i++) - { - platform_sleep_ns(1000); - } - - ASSERT_EQUAL(1, __sync_fetch_and_add(&callback_state.callbacks, 0)); - ASSERT_EQUAL(0, callback_state.status); - ASSERT_TRUE(¬ification == callback_state.notification); - - splinterdb_notification_deinit(¬ification); - - verify_point_lookups(data->kvsb, num_keys); - verify_full_scan(data->kvsb, num_keys); -} - CTEST2(splinterdb_optimize, test_concurrent_overlapping_ranges) { const uint32 num_keys = 320; @@ -263,7 +235,7 @@ CTEST2(splinterdb_optimize, test_concurrent_overlapping_ranges) } for (uint32 i = 0; i < num_threads; i++) { - rc = splinterdb_notification_wait(¬ifications[i]); + rc = splinterdb_notification_wait(data->kvsb, ¬ifications[i]); ASSERT_EQUAL(0, rc); ASSERT_TRUE(&args[i] == splinterdb_notification_user_data(¬ifications[i])); @@ -415,19 +387,6 @@ verify_full_scan(splinterdb *kvsb, uint32 num_keys) splinterdb_iterator_deinit(itor); } -static void -optimize_callback(splinterdb_notification *notification) -{ - optimize_callback_state *state = - splinterdb_notification_user_data(notification); - int status = EINVAL; - - platform_assert(splinterdb_notification_poll(notification, &status)); - state->status = status; - state->notification = notification; - __sync_fetch_and_add(&state->callbacks, 1); -} - static void * optimize_thread(void *arg) {