Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Zbus: a message bus system #45910

Closed
rodrigopex opened this issue May 23, 2022 · 44 comments
Closed

[RFC] Zbus: a message bus system #45910

rodrigopex opened this issue May 23, 2022 · 44 comments
Labels
area: API Changes to public APIs RFC Request For Comments: want input from the community

Comments

@rodrigopex
Copy link
Contributor

rodrigopex commented May 23, 2022

Introduction

Embedded systems are everywhere, playing different roles in our lives. Embedded software grows in complexity as technology evolves over time. To keep up with the increasing complexity, practitioners and researchers put effort into launching numerous development solutions, such as frameworks, libraries, RTOSes, and so forth. Zephyr is one of these solutions. However, as with the majority of popular RTOSes (see table below), Zephyr has only a limited set of Inter-Process Communication (IPC) mechanisms. This makes development hard when dealing with multi-threaded systems. Because of that, developers sometimes add coupled code to decrease the number of threads of the solution. The communication between threads becomes a nightmare when the system requires a more complex inter-thread communication approach. For example, there is no straightforward way of making a decoupled many-to-many thread communication in Zephyr. Linux solves part of that problem using D-Bus, a message bus system, as a simple way for applications to talk to one another. I suggest an alternative message bus with a focus on speed, memory, and energy consumption.

RTOS Shared memory Message Queue Stream/Pipe Mailbox Event Flag/Signal Comments Reference
Amazon FreeRTOS true true true false true   https://freertos.org/a00106.html
Mbed true true false false true   https://os.mbed.com/docs/mbed-os/v6.15/apis/scheduling-rtos-and-event-handling.html
RIOT true true false false false   https://doc.riot-os.org/group__core.html
NuttX true true true false true FILE stream is very powerful https://nuttx.apache.org/docs/latest/reference/index.html
Zephyr true true true true true Event flag is made by k_poll_signal https://docs.zephyrproject.org/3.0.0/reference/kernel/index.html#data-passing
Azure RTOS (ThreadX) true true false false true   https://docs.microsoft.com/pt-br/azure/rtos/threadx/chapter4

Problem description

Based on my knowledge, Zephyr does not offer any IPC able to perform a many-to-many thread communication. There is no straightforward way of making a decoupled many-to-many thread communication in Zephyr. The closest IPC available for that is the mailbox, but it does not deliver the message to many in fact. It delivers to the first reader. Developers must reinvent the wheel all the time when they need many-to-many thread communication.

Proposed change

Add a fast and decoupled inter-process communication mechanism (message bus, which hereafter is referred to as zbus, from "Zephyr-bus") to Zephyr to enable a many-to-many communication model. Using this IPC, developers will be able to easily make thread communication even for a many-to-many model. Besides the communication capabilities, in the way it is being proposed, the bus will offer asynchronous, and structured communication with time, space, and synchronization decoupling between the communication entities. Even ISRs (Interrupt Service Routines) will be able to send messages through the bus. The bus, in the current implementation, is made by a static shared-memory portion, semaphores, and message queues working together.

Detailed RFC

This RFC describes zbus, a message bus aimed to improve Zephyr threads' communication capabilities. It is implemented based on the publish/subscribe pattern. Message data is transmitted using managed shared-memory approach which provides well-suited performance. The figure below provides an overview of zbus.

The operations that can be done by threads are only publishing, subscribing, and reading channels. Publish and read operations are done by a thread that wants to change or read the channel message, respectively. In order to know if a channel had its message changed, a thread can subscribe to a channel and receive this notification. Publishing and reading can be done in execution time, but the subscription must be done in compile time

Proposed change (Detailed)

The Figure below presents the internal details of zbus. For simplicity purposes, there is only an internal thread to keep track of the changes and notify the subscribers when some channel is published. The description of each component is as follows:

  • Event dispatcher is the thread responsible for monitoring the change queue and sending the notifications to the subscribers' threads. The subscribers are mapped to channels at the subscribers' table;
  • Channel changed queue is a message queue where, after publishing, the changed channel's id will be passed. The Event dispatcher uses this message queue to keep the subscribers updated about the channels;
  • Channels structure stores all the channel data. This is a shared memory portion that would be globally accessed, but not directly;
  • Channel is formed by metadata and message:
    • Metadata is the information related to a channel to help the bus to work. Channels can be:
      • Read-only: the channel cannot be published. The only way of changing that is by setting an initial message value. It is useful for constants, like version;
      • On change: with this flag set to true, the event dispatcher will only notify the threads if the channel's message changes in fact. If someone publishes to the same channel the same message value, the dispatcher will ignore and not notify the subscribers;
      • Message size: this field stores the channel's message size;
      • Message reference: this field stores the channel's message reference;
      • Semaphore: this field references the semaphore used to manage the channel's access;
      • Subscribers: this field stores the channel's subscribers list;
    • Message is the information exchanged by threads.

The actions available for a thread in zbus are:

  • Subscribe: When a thread needs to keep track of some channel changes, it will subscribe to it (compile-time). Whenever necessary, based on the On change flag, the event dispatcher will notify the subscribers. The notification is only an indication of the changed channel (an id, here is mapped to zbus_index_<channel's name> value) but not the message. The message reading is a discretionary action made by the subscriber. Sometimes the subscriber does not have an interest in reading the message, but wants to know if it changed. This is done to improve speed and make the thread with higher priority access the message first. All the threads are notified at the same time, but the reaction order of the threads is based on their priority;
  • Publish when a thread aims to change a channel message, it will publish to it (execution-time). There is a type-checking before this action. If the developer uses a different message type for this action, a compilation error will rise;
  • Read when a thread is interested in a channel message, it will read from it and retrieve the stored message without changing it (execution-time). The same type-checking made for publishing is done here.

The type checking for publishing (the reading is similar) is done by the code illustrated as follows:

#define zbus_chan_pub(chan, msg, timeout)                                               \
    ({                                                                                  \
        {                                                                               \
            __typeof__(__zbus_channels_instance()->chan) chan##__aux__;                   \
            __typeof__(msg) msg##__aux__;                                           \
            (void) (&chan##__aux__ == &msg##__aux__);                                 \
        }                                                                               \
        __ZBUS_LOG_DBG("[ZBUS] %s pub " #chan " at %s:%d", (k_is_in_isr() ? "ISR" : ""),  \
                     __FILE__, __LINE__);                                               \
        __zbus_chan_pub(ZBUS_CHANNEL_METADATA_GET(chan), (uint8_t *) &msg, sizeof(msg), \
                      timeout);                                                         \
    })

API description

To use zbus in its current implementation, the developer needs to define the messages and the channels by creating the zbus_messages.h and the zbus_channels.h files. The messages file must contain all types used to define the messages and the message definition. The channels are defined by setting some values of a macro as in the code as follows:

ZBUS_CHANNEL(<channel's name>,                           
           <on_change>,                                
           <read_only>,                                
           <message type>,                             
           ZBUS_CHANNEL_SUBSCRIBERS(<subscribers list>), 
           ZBUS_INIT(<initial message value>)
)

The <subscribers list> is a list of subscribers' message queues. If we want a thread to receive notifications of a channel we need to pass a queue for that. The thread needs to check the queue for knowing the changing events.
The <initial message value> can be a struct initialization or just a zero for default initialization. For example ZBUS_INIT(.field1 = 10, .field2 = false). After the channel definition, zbus will provide the zbus_channel_index_t which is an enum with the channels' ids. The id generated to a channel is zbus_index_\<channel's name\> of the type zbus_channel_index_t. Event dispatcher uses these ids to send notifications.

The publishing and reading actions are executed by calling the macro zbus_chan_pub(<channel's name>, <message>, <timeout>) and zbus_chan_read(<channel's name>, <message>, <timeout>). The fields description are:

  • <channel's name>: The name of the channel but with no ";
  • <message value>: The value of the channel's message, not its reference. The macro will do what is needed;
  • <timeout>: The regular Zephyr timeout. For ISR calls you must use K_NO_WAIT.

Example of use

This example illustrates the way the subscribers can react to a notification. In this case, we have an immediate callback, a work queue callback, and a thread consuming the notification. The sensors thread generates samples of data and publishes them to the channel sensor_data. All the subscribers receive the notification or execute callbacks directly from the event dispatcher (high priority). The event dispatcher must have the highest priority among the user's threads to guarantee a proper execution. However, the developer must be careful with the callback subscribers because they are executed in the event dispatcher context (with high priority). Another good approach, for low priority actions, is to use work queues to execute the action in fact. The callback only submitted the work when executed to avoid problems.

Messages definition file. Here we have version and the sensor messages:

#ifndef _ZBUS_MESSAGES_H_
#define _ZBUS_MESSAGES_H_
#include <zephyr.h>

struct version {
    uint8_t major;
    uint8_t minor;
    uint16_t build;
};

struct sensor_msg {
    uint32_t temp;
    uint32_t press;
    uint32_t humidity;
};

#endif  // _ZBUS_MESSAGES_H_

Channels definition file. Here we have the read-only version channel and the sensor_data channel:

ZBUS_CHANNEL(version,                        /* Name */
             false,                          /* Persistent */
             false,                          /* On changes only */
             true,                           /* Read only */
             struct version,                 /* Message type */
             ZBUS_CHANNEL_SUBSCRIBERS_EMPTY, /* Subscribers */
             ZBUS_INIT(.major = 0, .minor = 1,
                       .build = 1023) /* Initial value major 0, minor 1, build 1023 */
)

ZBUS_CHANNEL(sensor_data,       /* Name */
             false,             /* Persistent */
             false,             /* On changes only */
             false,             /* Read only */
             struct sensor_msg, /* Message type */
             ZBUS_CHANNEL_SUBSCRIBERS(fast_handler1, 
                                      delay_handler1, 
                                      thread_handler1), /* Subscribers */
             ZBUS_INIT(0)                               /* Initial value {0} */
)

Assuming a trivial implementation of a sensor thread that generates sensor samples and publishes them to the sensor_data channel:

#include <logging/log.h>
#include "zbus.h"
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);

void peripheral_thread(void)
{
    struct sensor_msg sm = {0, 0};
    while (1) {
        sm.press += 1;
        sm.temp += 10;
        sm.humidity += 100;
        LOG_DBG("Sending sensor data...");
        zbus_chan_pub(sensor_data, sm, K_MSEC(250));
        // k_msleep(10);
    }
}

K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 5, 0, 0);

Assuming a trivial implementation of the subscribers in different approaches. They are based in a callback, a work queue and a thread:

#include <stdint.h>
#include "kernel.h"
#include "sys/util_macro.h"
#include "zbus.h"

#include <logging/log.h>
#include "zbus.h"
#include "zbus_messages.h"
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);

void fh1_cb(zbus_channel_index_t idx);
ZBUS_SUBSCRIBER_REGISTER_CALLBACK(fast_handler1, fh1_cb);

void dh1_cb(zbus_channel_index_t idx);
ZBUS_SUBSCRIBER_REGISTER_CALLBACK(delay_handler1, dh1_cb);

struct sensor_msg msg = {0};
void fh1_cb(zbus_channel_index_t idx)
{
    zbus_chan_read_by_index_unsafe(idx, msg, K_NO_WAIT);
    printk("Sensor msg processed by CALLBACK fh1: temp = %u, press = %u, humidity = %u\n",
           msg.temp, msg.press, msg.humidity);
}

struct sensor_wq_info {
    struct k_work work;
    zbus_channel_index_t idx;
    uint8_t handle;
};

struct sensor_wq_info wq_handler1 = {.handle = 1};

void wq_dh_cb(struct k_work *item)
{
    struct sensor_wq_info *sens = CONTAINER_OF(item, struct sensor_wq_info, work);
    zbus_chan_read_by_index_unsafe(sens->idx, msg, K_NO_WAIT);
    printk("Sensor msg processed by WORK QUEUE handler dh%u: temp = %u, press = %u, "
           "humidity = %u\n",
           sens->handle, msg.temp, msg.press, msg.humidity);
}

void dh1_cb(zbus_channel_index_t idx)
{
    wq_handler1.idx = idx;
    k_work_submit(&wq_handler1.work);
}

void main(void)
{
    k_work_init(&wq_handler1.work, wq_dh_cb);

    struct version v = {0};
    zbus_chan_read(version, v, K_NO_WAIT);

    LOG_DBG("Sensor sample started, version %u.%u-%u!", v.major, v.minor, v.build);
}

ZBUS_SUBSCRIBER_REGISTER(thread_handler1, 4);
void thread_handler1_task()
{
    zbus_channel_index_t idx = ZBUS_CHANNEL_COUNT;
    while (!k_msgq_get(thread_handler1.queue, &idx, K_FOREVER)) {
        zbus_chan_read_by_index_unsafe(idx, msg, K_NO_WAIT);
        printk("Sensor msg processed by THREAD handler: temp = %u, press = %u, "
               "humidity = %u\n",
               msg.temp, msg.press, msg.humidity);
    }
}

K_THREAD_DEFINE(thread_handler1_id, 1024, thread_handler1_task, NULL, NULL, NULL, 3, 0,
                0);

The sequence of activities based on the code above is illustrated as follows.

The sequence starts with the sensors thread generating a sample and publishing it to the sensor_data channel. The bus immediately executes the callbacks, one of them would actually run another will submit work to the system work queue. After that callback, the system will notify the thread handler about the change by sending to it the id of the sensor_datachannel. Supposing the system work queue has a higher priority than the thread handler, it will execute the work. The thread handler wakes up, reads the content of the channel, and prints it. The described actions would run in a loop and repeat indefinitely.

Benchmark

The benchmark was designed to transfer 256KB (262144 bytes) from the producer to the consumer. The only variable was the size of the channel's message, from 1 byte to 256 bytes. The board where the benchmark was executed is a hifive1_revb, Zephyr v3.0.0, and the code is in the repository samples.

Message size (bytes) Callback average (ms) Callback average data rate (B/s) Thread average (ms) Thread average data rate (B/s)
1 41797,00 6124,84 58322,67 4389,37
2 21096,67 12134,62 29263,00 8748,25
4 10656,00 24024,02 14734,67 17373,99
8 5450,67 46966,73 7478,33 34232,23
16 2838,67 90183,18 3859,00 66338,43
32 1531,00 167210,97 2044,33 125224,20
64 880,33 290798,94 1133,00 225948,81
128 555,00 461261,26 680,00 376470,59
256 391,00 654731,46 453,00 565121,41
The table shows how the channel's size impacts time. Another influential factor is subscriber style. It will be faster when synchronous and slower when asynchronous as expected. In the best-case scenario, the developer can expect to send from 4000 to 600000 bytes per second when using the zbus. It will depend on the number of threads, size of channels, subscriber style, and other factors. But it would work properly in many scenarios of use if well designed. For example Bluetooth scan process, sensor capture, streaming of bytes, etc.

Benefits

In this section I will describe some benefits of using zbus:

  1. Increase abstraction: It can perform one-to-one, one-to-many, and many-to-many communication models seamlessly. The developer can solely use zbus for almost all of the communication needed. The only limitation here is performance, but for control communication, it seems to be enough (this needs more performance measurements).
  2. Thread does not need to know the others: The communication is not oriented by destination, but by channel. The channel is defined by a struct so it works as an API. We implicitly add an Interface Definition Language to it by using C structs as the message definition. Another feature is the type checking during compile-time to avoid using wrong message types during publishing and reading;
  3. Well-known publish/subscribe pattern: It is getting more common in the embedded systems field with the advent of Internet of Things. So I believe there won't be high friction in adopting it;
  4. Improve testability: By that, I understand that the developer can easily replace threads by adding stubs or mocks to act as the actual one. With that, we can increase the observability and controllability of the code when needed by inspecting the message exchanged, and injecting messages respectively;
  5. All the communication can be done in an asynchronous reactive way, enabling the system to be easily power efficient.

Extensibility

There is an extensible logic to the bus enabling that to be monitored and even replicated in different targets. It is possible to capture all the messages exchanged by the bus and inject messages as well. It is also possible to replicate the changes from one bus to another by using some interface like serial or BLE. It is a developer activity, for now, there is no code in zbus related to the replication process.

Future work

I would imagine that using zbus will increase the abstraction and reusability of Zephyr threads. A set of correlated channels form a Port, which means this Port has all the APIs needed to use some thread (as a service). A device driver interface could be written using zbus, it would be easy to use, without adding extra driver API calls to the user code. The sensor driver API would be an example. The fetch, the data, and other related things could be channels. Image the code below could be real:

// ...
void button_pressed(const struct device *dev, struct gpio_callback *cb,
		    uint32_t pins)
{
    struct trigger_msg fetch = {true};
    zbus_chan_pub(BME280_SENSOR_FETCH, fetch, K_NO_WAIT);
}
void some_thread(void)
{
    zbus_channel_index_t idx = 0;
    struct bme280_msg sensor_data = {0};
    while (1) {
        if (!k_msgq_get(&some_thread_queue, &idx, K_FOREVER)) {
            if (idx == zbus_index_bme280_sensor_fetch_done) {
                zbus_chan_read(BME280_SENSOR_DATA, sensor_data, K_MSEC(100));
                // you can read sensor data by:
                // sensor_data.temp
                // sensor_data.press
                // sensor_data.humidity
            }
        }
    }
}
// ...

Maybe all of the repetitive and error-prone initialization of devices could be done by the driver and started only by the DTS. No sensor API calls and no sensor initialization code are needed. Just the "service" enabled in DTS and everything running properly. It would be necessary to add an abstraction layer on top of the bus which initializes and manage the "BME280 sensor service". I did that for GNSS, in my tests, and I could change the GNSS module with no changes on the consumer side. The "GNSS service API" can still be the same, only the adapter had to change. I am from both industry and academy, I am a Ph.D. candidate right now, and my work is to define an architecture that enables great maintainability and abstraction by using some software engineering techniques, and zbus is part of that. This example of the sensor abstraction is a drop of that.

Dependencies

The implementation depends only on semaphores and message queues. The rest of the code is plain C and there is no dynamic allocation there.

Concerns and Unresolved Questions

The main concerns about the solution:

  • The heavy use of preprocessor may generate unreadable compilation errors when the developer makes a mistake;
  • There is no pub/sub loop checking. It is not an easy task considering that can occurs in a chain of calls. If a thread is subscribed to a channel and published to it, maybe a high priority loop will occur (the event dispatcher must have a high priority) causing a kind of starvation;
  • The bus is not made for streaming purposes. It would be used for control messages only. It will be necessary to measure performance;
  • It will increase the footprint of the solution. Each channel allocates the message structure and a metadata portion of 32 bytes. Each subscriber will allocate a message queue to receive change notifications;
  • I have implemented systems using this before, and the results were very interesting for me. But the code needs a lot more tests. I did some initial tests but, it must have plenty more.

Alternatives

I could not find any direct alternative able to run in constrained devices with this kind of set of features.

Community alternatives suggestions:

The table below is a superficial comparison between suggested alternatives. This comparison possibly contains bias because I do not know or understand the Event Manager as I do for zbus.

Comparison item zbus Event Manager NCS Laird messaging bus
Made for Increase code quality by enabling reuse and increasing testability Reduce the number of threads using events and callbacks Multiple receivers broadcast messaging framework system
Metaphor Messaging bus Event manager Event manager
Message definition time Compile-time Compile-time Compile-time, though a custom message could be created dynamically, but the receivers would need to know how to parse the struct
Message definition approach Centralized, single file to describe channels and subscriptions Decentralized, one file per each message and subscription would happen in every place Decentralized, multiple files, usually one per module, all combined into an auto-generated output file by CMake
Message allocation style Static (compile-time) Dynamic or Static (execution-time, it uses a “weak” alloc function) Dynamic (execution-time)
Message persistency Persistent still exists after processed Transient, deleted after processed Both, cleaned up if DISPATCH_OK is returned by a handler, otherwise remains
Message distribution pattern (Take a look at the Reference [1]) Publish/subscribe Message passing (Event/listener) Message passing (Event/listener)
Subscription style Compile-time, but it can be disabled in execution-time by masking the subscriber Compile-time, but run-time could be added using extension hooks Compile-time but execution-time filtering was planned and partially added
Message transmission style Direct transmission when using callbacks style. Two-factor for asynchronous transmission where, first, the event dispatcher sends the id of the changed channel to the subscriber, and second the subscriber decides if it reads the content or not. The transmission order is defined by the position of the subscriber on the subscribing list Direct transmission. It transmits the data to the listeners one by one, you can define priority to regulate the transmission order Direct transmission. Sends an event to listeners one by one for them to process (if a broadcast), unicast goes to a single listener. The order is defined by the order in which listeners register themselves at startup
Subscriber execution styles Synchronous (by callback), asynchronous (by queue) Synchronous (by callback) Asynchronous (by queue)
Implementation approach Static memory, semaphores, and queues Dynamic memory, kernel spinlock, LD files, sys APIs Dynamic memory (buffer pool), message queues
Use code generation (tools/scripts) No, only macros No, only macros CMake used for autogenerating 3 sets of output files: message IDs, message codes, and message types (containing structs) which can be globally included
Extension mechanism (you can add your own functionality) Yes (uart_bridge, remote_mock) Yes (app_event_manager_profiler_tracer, event_manager_proxy) Not built in, no
Maturity Feature-complete Production-ready Production-ready

Initial implementation

The current implementation is made using several preprocessor macros and possibly will be changed. This is a simple implementation and a proof of concept of the bus. You can take a look at the PoC code here.

References

  1. Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec. 2003. The many faces of publish/subscribe. ACM Comput. Surv. 35, 2 (June 2003), 114–131. https://doi.org/10.1145/857076.857078
@rodrigopex rodrigopex added the RFC Request For Comments: want input from the community label May 23, 2022
@henrikbrixandersen henrikbrixandersen added the area: API Changes to public APIs label May 23, 2022
@rodrigopex rodrigopex changed the title Zbus: a message bus system [RFC] Zbus: a message bus system May 23, 2022
@ck-telecom
Copy link
Contributor

#38611 how about this PR?

@hongshui3000
Copy link
Contributor

hongshui3000 commented May 24, 2022

#38611 A good PR but abandoned.

@henrikbrixandersen
Copy link
Member

Regarding alternatives - have you seen KBUS? http://kbus.readthedocs.io/en/latest/specification.html

@rodrigopex
Copy link
Contributor Author

@henrikbrixandersen I was reading the documentation, and I noticed that several concepts are the same as I am purposing here, but this seems to be not a valid alternative because, as they said this is "for Linux" means what it says, since the Linux kernel is required

@stephanosio
Copy link
Member

FWIW, #37223 is trying to implement something similar.

@nordicjm
Copy link
Collaborator

Another similar zephyr-based IPC system: https://github.com/LairdCP/zephyr_framework

@rodrigopex
Copy link
Contributor Author

rodrigopex commented May 24, 2022

@carlescufi and others, I superficially read the #38611, and I could not find a reason other than complexity to keep the Event Manager on NCS and Zephyr with the requested changes. Was that scenario changed? Is there another reason? It seems to be a powerful tool but has its complexities.

Main differences I could catch:

  • EM uses dynamic memory for messages; zbus uses only static;
  • EM frees event message after processing the event; zbus don't. The message would be still available;
  • EM sends the content of the event to the listeners; zbus event dispatcher only indicates the changed channel id to the subscriber, the subscriber would discretionary read it;
  • EM seems to have a more verbose usage, the user needs to create a file for each event. If you imagine a system with 40 different events, I guess, it would be hard to maintain;
  • EM counts with some low-level features like spinlock (kernel), link descriptor file, sys_list_append, etc; zbus uses only semaphores and message queues.

I guess zbus is similar in features but simpler to use and maintain. In my own opinion, the bus metaphor is easier to understand and use than events. Developers will explicitly describe all the channels and subscribers in a centralized way (zbus_channels.h file).

@ck-telecom
Copy link
Contributor

ck-telecom commented May 25, 2022

he sequence starts with the user pressing the button. After that, inside the ISR of the button, the channel BUTTON is published. The bus will notify the App Logic thread through its queue. The App Logic will wake up and change the LED state by reading, inverting the message content and publishing again to the LED channel. The following activity is the Peripherals thread being wake-up by the bus indicating the LED channel has been changed. The Peripherals thread reads the LED message and updates the LED state. Using zbus, the developer can easily change/add the source of the toggle action. There is no coupling between the threads.

there seems to be lots of threads if in a complicated user case, this will waste lots of time while switch threads, just like sensor in a sampling rate to publish sensor data to other thread(modules)

@rodrigopex
Copy link
Contributor Author

@lairdjm do you have a sample code using that? I would like to see that. thanks

@nordicjm
Copy link
Collaborator

@lairdjm do you have a sample code using that? I would like to see that. thanks

There is, https://github.com/LairdCP/BLE_Gateway_Firmware though the version of framework on github and that that application uses is a bit outdated, parts have been rewritten or improved and the files are now auto-generated from all the input files using cmake functions, a snapshot of the newer version can be seen on https://github.com/lairdjm/framework_zephyr (it needs another repository for the cmake functions which essentially just add files to lists)

@rodrigopex
Copy link
Contributor Author

rodrigopex commented May 25, 2022

there seems to be lots of threads if in a complicated user case, this will waste lots of time while switch threads, just like sensor in a sampling rate to publish sensor data to other thread(modules)

@ck-telecom You can use callbacks for that if you prefer. No threads are needed. It is just an example of code. If you take a look at the code, you can see in the integration test an example of using callbacks. If you need speed to transfer sensor data, you could use a pipe instead. I have added a sample to illustrate that. Take a look at the https://github.com/zephyr-bus/zbus/tree/main/samples/work_queue there you can see many ways of using that.

@rodrigopex
Copy link
Contributor Author

@stephanosio change the prefix from zb_ to zbus_ at the code. It seems to be better. Thank you.

@rodrigopex
Copy link
Contributor Author

I have updated the Example of use section with a more realistic example. I hope it helps.

@rodrigopex
Copy link
Contributor Author

@zycz would you mind verifying if what I wrote in the comparison table (Alternatives section of RFC) regarding the Event Manager is true/correct?

@lairdjm @stephanosio guys could you please summarize the features of your implementations taking the Comparison item as a template? I would add that to the table. Thank you very much.

@nordicjm
Copy link
Collaborator

@rodrigopex Sure:

Made for: Multiple receiver broadcast messaging framework system
Metaphor:
Message definition time: Compile time, though a custom message could be created dynamically but the receivers would need to know how to parse the struct
Message definition approach: Multiple files, usually one per module, all combined into an auto-generated output file by cmake
Message persistency: Cleaned up if DISPATCH_OK is returned by handler, otherwise remains
Message distribution pattern: Event/listener
Subscription style: Compile time but runtime filtering was planned and partilly added
Message transmission style: Sends event to listeners one by one for them to process (if a broadcast), unicast goes to single listener
Subscriber execution styles: Syncronous, inside a thread
Implementation approach: Dynamic memory (buffer pool), msgq
Use code generation (tools/scripts): cmake used for autogenerating 3 sets of output files: message IDs, message codes and message types (containing structs) which can be globally included
Maturity: Production-ready

@zycz
Copy link
Contributor

zycz commented May 26, 2022

It is worth mentioning that App Event manager provides mechanism for adding extensions. There is a hook list to which you can add your own functionality. You can see examples how it is done in NCS app_event_manager_profiler_tracer or event_manager_proxy. Profiler was created to visualize event propagation in time and proxy allows to export events between cores.

Only small changes in what @rodrigopex written about Event manager:

Regarding Message allocation style, it's by default Dynamic (execution-time) but alloc function is "weak" so can be overwritten and static allocation can be used instead.
Regarding Subscription style, it's Compile-time, but run-time could be added using extension hooks.
Regarding Message transmission style, it's Transmits the data to the listeners one by one, you can define priority to regulate the transmission order.
Regarding Subscriber execution styles, it's Synchronous (by callback)

@rodrigopex
Copy link
Contributor Author

@lairdjm Could you please check the text and clarify the marked points (with ???) in your column? Try to normalize with the other cells in the table.

@zycz the adjustments were made, can you please double-check that?

Thank you, guys.

@nordicjm
Copy link
Collaborator

@rodrigopex Looks good! Additions:

Metaphor: Not really sure how best to describe it, it can be used for both normal messaging or event data
Message allocation style: dynamic (heap)
Message transmission style: The order cannot be re-defined, it is defined by the order in which listeners register themselves at startup
Subscriber execution styles: in a thread with a msgq
Extension mechanism (you can add your own functionality): Not built in, no

@zycz
Copy link
Contributor

zycz commented May 27, 2022

@zycz the adjustments were made, can you please double-check that?

Everything looks ok :)

@rodrigopex
Copy link
Contributor Author

Subscriber execution styles: in a thread with a msgq

@lairdjm it seems to be asynchronous, right? If it uses message queues to transmit the data, the thread (subscriber/listener/receiver/consumer/whatever 😄) would choose when read that, right?

@hongshui3000
Copy link
Contributor

@rodrigopex
Is it possible to make zbus an external module of zephyr, so that I can directly import this module and evaluate it in my own application?

@rodrigopex
Copy link
Contributor Author

@rodrigopex Is it possible to make zbus an external module of zephyr, so that I can directly import this module and evaluate it in my own application?

@hongshui3000 not yet, but I will add the necessary files to and let you know.

@rodrigopex
Copy link
Contributor Author

rodrigopex commented May 31, 2022

Done! Zbus as a Zephyr module is ok. Check the Zbus as a module sample. @hongshui3000 if you need help, you can contact me on Discord.

@hongshui3000
Copy link
Contributor

@rodrigopex good job . Thanks

@cfriedt
Copy link
Member

cfriedt commented Jun 2, 2022

@rodrigopex - I'm just looking over the RFC now, and I wish I would have attended the API meeting.

Just at a glance, it looks like messages are maybe fixed-size based on a C struct and use native byte ordering.

Would be relatively easy to support other, possibly self-describing, serialization formats in Zbus? I would imagine that users might prefer to have some flexibility there.

https://en.wikipedia.org/wiki/Comparison_of_data-serialization_formats

@rodrigopex
Copy link
Contributor Author

@rodrigopex - I'm just looking over the RFC now, and I wish I would have attended the API meeting.

@cfriedt thanks for your comments. I attended the meeting, we discussed a lot about zbus.

Just at a glance, it looks like messages are maybe fixed-size based on a C struct and use native byte ordering.

Yes, the message is a fixed-size struct or union to force the publishers and subscribers to use a channel properly. It will work as an API.

Would be relatively easy to support other, possibly self-describing, serialization formats in Zbus? I would imagine that users might prefer to have some flexibility there.

Sure, for free (but not perfect), you can use a struct with an array of bytes field, where you can store your data in any format.

https://en.wikipedia.org/wiki/Comparison_of_data-serialization_formats

During the Zbus design, I have considered using this kind of approach to define messages, but I was concerned about speed and memory usage. So I have decided to use the structs and unions. I am convinced that it is a good solution for now. Do you think using serialization format is better for message defining? My concer about it is the data's size variability and the need of encoders and decoders. It would increase the communication overhead. For me, it would be done by the application using a raw message type to carry the data.

@hongshui3000
Copy link
Contributor

#38611 PR TO EX MODULE https://github.com/hongshui3000/event_manager

@rodrigopex
Copy link
Contributor Author

rodrigopex commented Jun 3, 2022

I have implemented the 256000 benchmark for Event Manager. https://github.com/rodrigopex/event_manager/blob/main/samples/

@cfriedt
Copy link
Member

cfriedt commented Jun 3, 2022

@rodrigopex - I'm just looking over the RFC now, and I wish I would have attended the API meeting.

@cfriedt thanks for your comments. I attended the meeting, we discussed a lot about zbus.

Just at a glance, it looks like messages are maybe fixed-size based on a C struct and use native byte ordering.

Yes, the message is a fixed-size struct or union to force the publishers and subscribers to use a channel properly. It will work as an API.

Would be relatively easy to support other, possibly self-describing, serialization formats in Zbus? I would imagine that users might prefer to have some flexibility there.

During the Zbus design, I have considered using this kind of approach to define messages, but I was concerned about speed and memory usage. So I have decided to use the structs and unions. I am convinced that it is a good solution for now. Do you think using serialization format is better for message defining? My concer about it is the data's size variability and the need of encoders and decoders. It would increase the communication overhead. For me, it would be done by the application using a raw message type to carry the data.

I agree that for most use cases relatively small, statically allocated, fixed-size buffers are perfectly fine, but it would be nice if the API had the ability to use larger, variable-sized buffers, possibly dynamically allocated. Technically, a pointer to an arbitrary memory location could be stored inside the fixed-size buffer - I wonder if there is some kind of flag or enumeration that could be used in that case.

Just curious - I love the Zbus concept, and feel it's absolutely a needed feature. I like that it's focused on remaining small as well, but of course some Zephyr users are at the complete opposite end of the spectrum and run on massive hyperscale infrastructure so having flexibility is important as well.

Thanks for putting the RFC together :-)

@hongshui3000
Copy link
Contributor

hongshui3000 commented Jun 4, 2022

I modified the event manager so that it can support single-threaded development. Hope to improve the development efficiency of zephyr under single thread. https://github.com/hongshui3000/event_manager/tree/main/samples/sensor_measurement

@rodrigopex
Copy link
Contributor Author

rodrigopex commented Jun 4, 2022

@rodrigopex - I'm just looking over the RFC now, and I wish I would have attended the API meeting.

@cfriedt thanks for your comments. I attended the meeting, we discussed a lot about zbus.

Just at a glance, it looks like messages are maybe fixed-size based on a C struct and use native byte ordering.

Yes, the message is a fixed-size struct or union to force the publishers and subscribers to use a channel properly. It will work as an API.

Would be relatively easy to support other, possibly self-describing, serialization formats in Zbus? I would imagine that users might prefer to have some flexibility there.

During the Zbus design, I have considered using this kind of approach to define messages, but I was concerned about speed and memory usage. So I have decided to use the structs and unions. I am convinced that it is a good solution for now. Do you think using serialization format is better for message defining? My concer about it is the data's size variability and the need of encoders and decoders. It would increase the communication overhead. For me, it would be done by the application using a raw message type to carry the data.

I agree that for most use cases relatively small, statically allocated, fixed-size buffers are perfectly fine, but it would be nice if the API had the ability to use larger, variable-sized buffers, possibly dynamically allocated. Technically, a pointer to an arbitrary memory location could be stored inside the fixed-size buffer - I wonder if there is some kind of flag or enumeration that could be used in that case.

Just curious - I love the Zbus concept, and feel it's absolutely a needed feature. I like that it's focused on remaining small as well, but of course some Zephyr users are at the complete opposite end of the spectrum and run on massive hyperscale infrastructure so having flexibility is important as well.

Thanks for putting the RFC together :-)

@cfriedt thank you for your comments and suggestions. That is possible for sure, and not hard do add. I guess it would be beneficial to the solution at all.

What do you think about the following possible API?

The Dynamic Channel definition would be like that:

ZBUS_DYN_CHANNEL(
    my_dyn_channel,                           
    ZBUS_CHANNEL_SUBSCRIBERS(sub1, sub2, sub3)
)

Allocating and freeing can be done externally by the user:

struct user_data * user_allocated_data = (struct user_data*) malloc(struct user_data);
zbus_dyn_chan_alloc(my_dyn_channel, user_allocated_data, sizeof(struct user_data), K_MSEC(200));
void * mem_ref = NULL;
zbus_dyn_chan_dealloc(my_dyn_channel, &mem_ref, K_NO_WAIT); /* set the channel's pointer to NULL and the size to zero. Potential memory leak here */
free(mem_ref);

Checking if it is already allocated would be like:

zbus_dyn_chan_is_allocated(my_dyn_channel, K_MSEC(200));

Retreiving the channel's size would be like:

size_t chan_size = zbus_dyn_chan_size(my_dyn_channel, K_MSEC(200));

Publishing and reading would be like:

zbus_dyn_chan_pub(my_dyn_channel, orig, sizeof(orig), K_MSEC(200));
zbus_dyn_chan_read(my_dyn_channel, dest, sizeof(dest), K_MSEC(200));

Advanced API

Borrow reference of a dynamic channel. Total control over that:

typedef struct {
    void *msg_ref;
    size_t msg_size;
} zbus_dyn_msg_t;
//...
zbus_dyn_msg_t msg_ref = {0};
zbus_dyn_chan_borrow(my_dyn_channel, msg_ref, K_MSEC(200)); // locks the channel to the other thread by taking the channel's semaphore
// Do what ever the user wants 
zbus_dyn_chan_give_back(my_dyn_channel, K_MSEC(200)); // release the channel to the other thread by giving the channel's semaphore

@rodrigopex
Copy link
Contributor Author

Now it is possible to make a channel dynamic. There is a sample showing how to do that: dyn_channel. I have added the possibility of claiming the channel’s message and performing some actions while it is claimed. After using that, the developer must call the finish function. The idea is similar to ring buffers.

@rodrigopex
Copy link
Contributor Author

Another feature added: message validator. If you provide a validator function at the channel declaration, the publishing action will check if the message is valid. If it is not, the publishing will fail.

@andrea-cowboy
Copy link

I will definitively look forward to zbus getting into Zephyr. For event-driven designs this brings robustness(shared and tested infrastructure) and enables more systems to spontaneously grow in an event-driven direction within the Zephyr community.

Nice work @rodrigopex !

@gregshue
Copy link

This sounds very similar to something I worked with for 8 generations of the OWEN printer firmware product line at HP. Is anyone interested in learnings from that experience?

@rodrigopex
Copy link
Contributor Author

This sounds very similar to something I worked with for 8 generations of the OWEN printer firmware product line at HP. Is anyone interested in learnings from that experience?

Hi @gregshue. I am interested to learn that. How do you want to proceed? Discord or a call?

@gregshue
Copy link

I am interested to learn that. How do you want to proceed? Discord or a call?

Let's capture it here so others can consider it too.

The event dispatcher must have the highest priority among the user's threads to guarantee a proper execution.

This eventually proved to be a barrier to scalability & composability. The same will happen in Zephyr. Part of the problem is not having a clear definition of "user's threads". In my downstream repo each "application" has an empty main(){}. I have many proprietary, composable drivers and subsystems. Each driver and subsystem may have one or more threads, and is combined with other subsystems/drivers from zephyrproject-rtos. Each build has a different feature set. Which are the "user threads"? At the architectural level, any/all of them. That means every component/driver must be designed to tolerate being preempted by the messaging system for an unpredictable amount of time, and every integrator must verify that all timing requirements of drivers/components are still being met even when the messaging system injects an unpredictable latency.

For a lightly loaded system this can appear to work, but for a heavily loaded one we eventually experienced too much latency for our hard and soft real-time threads. We then recognized that a single announcer thread effectively forced unnecessary priority elevation for handling most channels. Eventually this design was abandoned in favor of components providing an instantiation of a common, extensible notification interface so that notifications where distributed at the priority of their source. Each subscriber already had to consolidate/resolve inputs from multiple sources while running at its assigned preemption priority - so components with threads had a private message queue and components without threads had a private mutex.

This problem becomes more relevant for the Zephyr ecosystem. Because we are targeting reuse of code on multiple CPU architectures, we must design a solution for the minimum hardware functionality and the maximum feature set. In this case that means a CPU that has only one hardware interrupt preemption level and a system that needs to use that for catastrophic error handling (e.g., hardware watchdog warning). In this case all "IRQ" code must execute in thread context, which may get preempted by the messaging system. We found out it was important to minimize unnecessary complexity faced by the integrators.

Zephyr has only a limited set of Inter-Process Communication (IPC) mechanisms

Do you really mean "Inter-Process"? Zephyr doesn't have processes (address remapping). The closest is asymmetric multi-processing (different cores, possibly with some private and some shared memory). At HP we found that our composable architecture and internal interfaces had to be designed with some awareness of whether functionality was implemented on the local core or a remote one. This affected how "cancel" was designed, how long timeouts were allowed to be, and what steps needed to be take for recovery from partial reset. Before we abandoned the central messaging service design we figured out the impact of distributing the service across multiple (asymmetric) cores and what information must be shared between the two. The amount of unavoidable complexity was larger than expected.

@rodrigopex
Copy link
Contributor Author

@gregshue

Which are the "user threads"?

I think drivers and subsystems as "system threads." Maybe I am wrong, but I think as "user threads," the ones created explicitly with K_THREAD_DEFINE (or a similar approach). So maybe internally, between drivers and subsystems, the message bus does not fit in fact. Think that as D-Bus for Linux; we do not use D-Bus to talk to device drivers; we use IOCTLs, Netlink, or syscalls, right? Same here; only "applications" ("user threads") should use zbus.

For a lightly loaded system this can appear to work, but for a heavily loaded one we eventually experienced too much latency for our hard and soft real-time threads.

The latency is a complicated part of communication. It depends on several variables. For example, maybe your MCU is too busy to guarantee a deadline; the threads' priorities are unbalanced, or someone made a bad communication mechanism choice. Nevertheless, the results were good in my tests, with a well-balanced priority and a reasonable publishing rate. Maybe it should be tested in more scenarios.

If, in your case, you have a stream of bytes, it would be better to use pipes or message queues. The message bus provides a way for "user threads" to exchange messages (information or events).

Do you really mean "Inter-Process"?

Yes, I do. I guess you are thinking of "Inter-Processor." The message bus is not designed to solve "inter-processor" communication problems. This is still a big challenge, and I did not focus on that for now.

Before we abandoned the central messaging service design we figured out the impact of distributing the service across multiple (asymmetric) cores and what information must be shared between the two.

For simplicity matters, each core will have its zbus instance. Still, it is possible to use zbus' extensible capability to keep both synchronized using an IPC service (OpenAMP, RPMSG, and ICMSG). Again, you can implement an extension module to choose the information needed on both.

The final thought I would like to share here is the message bus (zbus) is for Zephyr as D-Bus is for Linux keeping proportionalities of complexity.

@gregshue
Copy link

I think drivers and subsystems as "system threads."

From previous experience, my entire printer solution would be in proprietary drivers + subsystems. My main() is empty. Some of my subsystems may execute under user space, at a preemption priority more urgent than other system services. This would include the native GUI, the fax telecom protocol, and motor control servos. So in this case everything is "system" and nothing is "application"?

Think that as D-Bus for Linux; we do not use D-Bus to talk to device drivers; we use IOCTLs, Netlink, or syscalls, right?

Perhaps that is true in Linux, but in my prior experiences the RTOS systems all ran in kernel space, often avoided the UNISTD API (IOCTLS) and cooperative scheduling/time slices were never used. Looking at Zephyr, we also must support everything running in kernel space, device drivers are accessed via custom APIs rather than IOCTLs, and product code can be interspersed across any set of preemption priorities.

maybe your MCU is too busy to guarantee a deadline;

Our systems had to be designed to guarantee timing deadlines were met. Many parts of the system were actually best-effort, so no timing deadlines existed for those parts.

Yes, I do. I guess you are thinking of "Inter-Processor."

Actually, I thought you were thinking of "Inter-Processor." Zephyr does not have processes. It only has one flat address space with segmented access control. When present, the MMU is only used as a glorified MPU and does not do address translation.

The final thought I would like to share here is the message bus (zbus) is for Zephyr as D-Bus is for Linux keeping proportionalities of complexity.

I understand the model and the design. What I am sharing from years of experience with shipping products is having all notifications published through a single, high priority thread has been shown to be unnecessarily limiting in a scalable, composable RTOS-based platform like Zephyr.

@gregshue
Copy link

Looking at the alternatives table, I see the Zbus message definition approach is "Centralized, single file to describe channels and subscriptions". This is not acceptable. Zephyr ecosystem is already a modular, (somewhat) extensible architecture. Reuse is implied in the Zephyr mission statements objective. Extensibility is required, and a single-file definition is not extensible. I expect this is why all the other alternatives have a distributed definition.

@rodrigopex
Copy link
Contributor Author

Looking at the alternatives table, I see the Zbus message definition approach is "Centralized, single file to describe channels and subscriptions". This is not acceptable. Zephyr ecosystem is already a modular, (somewhat) extensible architecture. Reuse is implied in the Zephyr mission statements objective. Extensibility is required, and a single-file definition is not extensible. I expect this is why all the other alternatives have a distributed definition.

@gregshue sorry for that. The information in the RFC is outdated. Now it is possible to create channels in a distributed way. It is also possible to create independent modules with it. I will fix it and let you know. I have written the RFC before starting discussing with the community members. Several aspects of the solution have evolved since that.

@rodrigopex
Copy link
Contributor Author

@gregshue, thanks to your sharings, now zbus has a "virtual distributed event dispatcher," which improved the solution in many ways. I understand the needs you are pointing out. Some of them we can address, but others don't. Let's say zbus is a message bus that helps threads talk to each other inside Zephyr. Sounds good? I will add a sample showing how to implement an independent module with zbus and let you know. You can tell me if it would be enough to keep composability. Thank you for your comments and suggestions.

@rodrigopex
Copy link
Contributor Author

Zbus was merged #48509.

@zeenix
Copy link

zeenix commented Jan 11, 2024

FWIW, I was already using the name zbus for my IPC library since 2019. Fortunately, that's a Rust library so chances of confusion are low. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: API Changes to public APIs RFC Request For Comments: want input from the community
Projects
None yet
Development

No branches or pull requests