Using Kernel Queues (kqueue) notifications in Swift Oct 9 2019 Latest Update: Jun 27 2020

Many components of macOS trace their roots back to BSD. One key aspect inherited from BSD is the Kernel Event Notification mechanism know as Kernel Queues (kqueues for short).

In the past, before kqueue(2)1/ kevent(2)2, when we wanted to track events related to a socket, or if a file descriptor changed, we used a polling mechanism to test for readiness/events (maybe using select(2) or poll(2)). This polling techniques were inefficient and got replaced by the more efficient kqueue API. Instead of continually polling, we can now be notified about events by the kernel. Also, mechanisms like select(2) and poll(2) were restricted to work with file descriptors. kqueue increases the range of operating systems events to monitor not only for file descriptors but also elements like signals, timers, network devices, etc.

In this post, we are going to explore how to use kqueue using Swift. The examples will include monitoring sockets, signals, and file system events. kqueue also support monitor events from timers, processes, network devices, memory, and a few others3. We are going to focus on three cases (sockets, signals, filesystem), but using the three cases as a base, you'll be able to use kqueue on any of the supported event types.

Let's begin by exploring the idea behind kqueue.

* You can check the full code in the GitHub Repository

Using kernel event notifications

As we mentioned before, it is inefficient to poll for events, and sometimes we can't obtain the event directly. So how can we know if an event occurred?

We know that all the system calls always notify the kernel, so it would be the perfect place to get the notifications from.

That is the idea behind kqueue. We create a kernel event queue, where we show our interest to the kernel for specific events, and when the kernel has some information about the events we are interested in it'll push it to our queue.

The process is as follows:

  1. Create a kernel event queue using the system call kqueue this returns a file descriptor.
  2. We set up our kqueue to register our interest in specific events by using the system call kevent.
  3. Again we use kevent but this time to listen for the events to be triggered.

That's it. Simple right?

There is more to it, but that is the skeleton from where we'll build our understanding of kqueues.

For our example, we are going to use a socket-based server. I'll use the same code for the echo server we created in the post Using BSD sockets in Swift. I'll focus on the code related to kqueue if you have any questions on the server code check the original post.

Ok, Let's get started by monitoring socket events.

Monitoring socket events using kqueue

In our previous implementation of the echo server, we could work only with one client at a time. Now let's change that, let's support multiple clients at the same time using GDC. Also, we are going to set up an event listener (using kevent) that will log how many characters are ready for reading on a specific socket(file descriptor). After getting the notification, we'll continue with what we previously had implemented, read the data and echo it back the client.

First, we are going to encapsulate the code that handles the reading from the socket, that way we don't have it cluttering our code. The function will look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    func readFrom(socket fd: Int32) {
      let MTU = 65536
      var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)

      let readResult = read(fd, &buffer, MTU)

      if (readResult == 0) {
        return  // end of file
      } else if (readResult == -1) {
        print("Error reading form client\(fd) - \(errno)")
        return  // error
      } else {
        //This is an ugly way to add the null-terminator at the end of the buffer we just read
        withUnsafeMutablePointer(to: &buffer) {
          $0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
            $0.advanced(by: readResult).assign(repeating: 0, count: 1)
          }
        }
        let strResult = withUnsafePointer(to: &buffer) {
          $0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
            String(cString: $0)
          }
        }
        print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
        write(fd, &buffer, readResult)
      }
    }

Now we can focus on the code for setting our kqueue and kevent. We are going to create a function to handle the set up. Create the new function setSockKqueue with the following signature:

1
func setSockKqueue(fd: Int32)

Remember our steps for setting up the kqueue:

  1. Create a kernel event queue using the system call kqueue this returns a file descriptor.
  2. We set up our kqueue to register our interest in specific events by using the system call kevent.
  3. Again we use kevent but this time to listen for the events to be triggered.

Ok, let's create our kqueue:

1
2
3
4
5
        let sockKqueue = kqueue()
        if sockKqueue == -1 {
            print("Error creating kqueue")
            exit(EXIT_FAILURE)
        }

The kqueue syscall returns a file descriptor. We can access our kernel queue using that file descriptor. If an error occurs, the kqueue returns -1.

Now we have to register our interest in a specific event. We use the system call kevent to register our kqueue for some event. Also, we use the same system call, kevent to read any pending events on the queue. The system call signature is the following:

1
2
3
kevent(_ kq: Int32, _ changelist: UnsafePointer<kevent>!,
 _ nchanges: Int32, _ eventlist: UnsafeMutablePointer<kevent>!,
 _ nevents: Int32, _ timeout: UnsafePointer<timespec>!) -> Int32

If we send a kevent structure on the changelist parameter, the system call will use that structure information to register our interest in the event described by the structure. If the changelist parameter is nil, the system call will wait(blocking) until an event matching any of our queue's interest is triggered and will save the event information in the kevent structure passed in the eventlist parameter. This process might sound confusing so let's see it in our code to make things clear.

First, let's view the definition of the kevent structure:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public struct kevent {

    public var ident: UInt /* identifier for this event */

    public var filter: Int16 /* filter for event */

    public var flags: UInt16 /* general flags */

    public var fflags: UInt32 /* filter-specific flags */

    public var data: Int /* filter-specific data */

    public var udata: UnsafeMutableRawPointer! /* opaque user data identifier */

    public init()

    public init(ident: UInt, filter: Int16, flags: UInt16, fflags: UInt32, data: Int, udata: UnsafeMutableRawPointer!)
}

Let me explain each of the structure properties:

* Remember that the structure can be populated by you, or by the kevent system call. So some of the fields make sense only when written by the triggered event.

Ok, let's create our structure to register in our kqueue:

1
2
3
4
5
6
7
8
9
10
        // Create the kevent structure that sets up our kqueue to listen
        // for notifications
        var sockKevent = kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil
        )

Here we are using the file descriptor fd that we received as a parameter. We want to be notified when the socket (represented by the file descriptor) has data to be read. We want to add the following actions EV_ADD and EV_ENABLE, this means add this event to our kqueue and allow the event to be returned when triggered. The flags, data, and udata we are not going to use so we set them to 0 or nil on each case.

Now we can call kevent with our kqueue and kevent structure to set up the kernel queue.

1
2
3
4
        // This is where the kqueue is register with our 
        // interest for the notifications described by
        // our kevent structure sockKevent
        kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)

Ok, now we are going to use the kevent to "listen" for events to be triggered.

We'll be running this code in a different thread using DispatchQueue asynchronously, so the main thread is not blocked.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
        DispatchQueue.global(qos: .default).async {
            var event = kevent()
            while true {
                let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {
                    if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        break
                    }
                    print("File descriptor: \(fd) - has \(event.data) characters for reading")
                    self.readFrom(socket: fd)
                } else {
                    print("Error reading kevent")
                    close(sockKqueue)
                    exit(EXIT_FAILURE)
                }
            }
            print("Bye from kevent")
        }

We won't go into detail for the DispatchQueue code. Let's focus on the code related to kevent.

We wait for any event on our sockKqueue to be triggered and then we read the data that is populated in the event structure. After we are sure that the socket has data we call readFrom(socket:) to read the data. Then we start the loop again and call kevent that will block and wait for another event to be triggered. We could have also added a timer as a time out. An example code could look like this:

1
2
            var timeout = timespec(tv_sec: 5, tv_nsec: 0)
            let status = kevent(sockKqueue, nil, 0, &event, 1, &timeout)

This code will cause the kevent to timeout after 5 seconds, and the return value stored in status would be 0.

Le'ts see the full body of our function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    func setSockKqueue(fd: Int32) {
        let sockKqueue = kqueue()
        if sockKqueue == -1 {
            print("Error creating kqueue")
            exit(EXIT_FAILURE)
        }


        // Create the kevent structure that sets up our kqueue to listen
        // for notifications
        var sockKevent = kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil
        )
        // This is where the kqueue is register with our 
        // interest for the notifications described by
        // our kevent structure sockKevent
        kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)


        DispatchQueue.global(qos: .default).async {
            var event = kevent()
            while true {
                let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
                print("Status: \(status) - Esto es el event after kevent:\(event)")
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {
                    if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        break
                    }
                    print("File descriptor: \(fd) - has \(event.data) characters for reading")
                    self.readFrom(socket: fd)
                } else {
                    print("Error reading kevent")
                    close(sockKqueue)
                    exit(EXIT_FAILURE)
                }
            }
            print("Bye from kevent")
        }
    }

Ok, next you'll see the complete code for Server.swift:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import Foundation

class Server {

    let servicePort = "1234"
    let prompts = ["%", "$", ">"]
    var currentPrompt = 0

    func readFrom(socket fd: Int32) {
      let MTU = 65536
      var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)

      let readResult = read(fd, &buffer, MTU)

      if (readResult == 0) {
        return  // end of file
      } else if (readResult == -1) {
        print("Error reading form client\(fd) - \(errno)")
        return  // error
      } else {
        //This is an ugly way to add the null-terminator at the end of the buffer we just read
        withUnsafeMutablePointer(to: &buffer) {
          $0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
            $0.advanced(by: readResult).assign(repeating: 0, count: 1)
          }
        }
        let strResult = withUnsafePointer(to: &buffer) {
          $0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
            String(cString: $0)
          }
        }
        print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
        write(fd, &buffer, readResult)
      }
    }

    func setSockKqueue(fd: Int32) {
        let sockKqueue = kqueue()
        if sockKqueue == -1 {
            print("Error creating kqueue")
            exit(EXIT_FAILURE)
        }


        // Create the kevent structure that sets up our kqueue to listen
        // for notifications
        var sockKevent = kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil
        )
        // This is where the kqueue is register with our 
        // interest for the notifications described by
        // our kevent structure sockKevent
        kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)


        DispatchQueue.global(qos: .default).async {
            var event = kevent()
            while true {
                let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {
                    if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        break
                    }
                    print("File descriptor: \(fd) - has \(event.data) characters for reading")
                    self.readFrom(socket: fd)
                } else {
                    print("Error reading kevent")
                    close(sockKqueue)
                    exit(EXIT_FAILURE)
                }
            }
            print("Bye from kevent")
        }
    }

    func start() {
        print("Server starting...")

        let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
            SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
            IPPROTO_TCP  //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
        )//Return a FileDescriptor -1 = error
        if socketFD == -1 {
            print("Error creating BSD Socket")
            return
        }

        var hints = addrinfo(
            ai_flags: AI_PASSIVE,       // Assign the address of the local host to the socket structures
            ai_family: AF_UNSPEC,       // Either IPv4 or IPv6
            ai_socktype: SOCK_STREAM,   // TCP
            ai_protocol: 0,
            ai_addrlen: 0,
            ai_canonname: nil,
            ai_addr: nil,
            ai_next: nil)

        var servinfo: UnsafeMutablePointer<addrinfo>? = nil
        let addrInfoResult = getaddrinfo(
            nil,                        // Any interface
            servicePort,                   // The port on which will be listenend
            &hints,                     // Protocol configuration as per above
            &servinfo)

        if addrInfoResult != 0 {
            print("Error getting address info: \(errno)")
            return
        }

        let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))

        if bindResult == -1 {
            print("Error binding socket to Address: \(errno)")
            return
        }

        let listenResult = listen(socketFD, //Socket File descriptor
            8         // The backlog argument defines the maximum length the queue of pending connections may grow to
        )

        if listenResult == -1 {
            print("Error setting our socket to listen")
            return
        }

        while true {
            var addr = sockaddr()
            var addr_len :socklen_t = 0

            print("About to accept")
            let clientFD = accept(socketFD, &addr, &addr_len)
            print("Accepted new client with file descriptor: \(clientFD)")

            if clientFD == -1 {
                print("Error accepting connection")
            }

            setSockKqueue(fd: clientFD)
        }
    }
}

That's it. Everything is ready. We can test our kqueue code. Let's run our server:

1
2
# if we are using Swift Package Manager
$ swift run

On another shell, we can connect to our server (on port 234) using ncat (or rdncat if you are using the one we created in the post about Network.framework). If we send two messages hi and hello, we would see something similar to this:

1
2
3
4
5
6
7
8
9
10
11
Welcome to our simple echo server!
Server starting...
About to accept
Accepted new client with file descriptor: 7
About to accept
File descriptor: 7 - has 2 characters for reading
Received form client(7) % hi
File descriptor: 7 - has 5 characters for reading
Received form client(7) % hello
The socket (7) has been closed.
Bye from kevent

You can see that our kevent is working correctly.

Let's now use kqueue/kevent to handle signals.

Using kqueue/kevent to listen for signals

If you are not familiar with signals, signals are a basic form of IPC (Inter-Process Communication). When we send a signal to a process to interact with it, the process is interrupted by the kernel to deliver the signal, and the process can react to the signal or ignore it (Except SIGKILL=9 which can't be ignored).

Some of the common signals are (check your man 3 signal for the ones supported by your system):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
     No    Name         Default Action       Description
     1     SIGHUP       terminate process    terminal line hangup
     2     SIGINT       terminate process    interrupt program
     3     SIGQUIT      create core image    quit program
     4     SIGILL       create core image    illegal instruction
     5     SIGTRAP      create core image    trace trap
     6     SIGABRT      create core image    abort program (formerly SIGIOT)
     7     SIGEMT       create core image    emulate instruction executed
     8     SIGFPE       create core image    floating-point exception
     9     SIGKILL      terminate process    kill program
     10    SIGBUS       create core image    bus error
     11    SIGSEGV      create core image    segmentation violation
     12    SIGSYS       create core image    non-existent system call invoked
     13    SIGPIPE      terminate process    write on a pipe with no reader
     14    SIGALRM      terminate process    real-time timer expired
     15    SIGTERM      terminate process    software termination signal
     16    SIGURG       discard signal       urgent condition present on socket
     17    SIGSTOP      stop process         stop (cannot be caught or ignored)
     18    SIGTSTP      stop process         stop signal generated from keyboard
     19    SIGCONT      discard signal       continue after stop
     20    SIGCHLD      discard signal       child status has changed
     21    SIGTTIN      stop process         background read attempted from control terminal
     22    SIGTTOU      stop process         background write attempted to control terminal
     23    SIGIO        discard signal       I/O is possible on a descriptor (see fcntl(2))
     24    SIGXCPU      terminate process    cpu time limit exceeded (see setrlimit(2))
     25    SIGXFSZ      terminate process    file size limit exceeded (see setrlimit(2))
     26    SIGVTALRM    terminate process    virtual time alarm (see setitimer(2))
     27    SIGPROF      terminate process    profiling timer alarm (see setitimer(2))
     28    SIGWINCH     discard signal       Window size change
     29    SIGINFO      discard signal       status request from keyboard
     30    SIGUSR1      terminate process    User defined signal 1
     31    SIGUSR2      terminate process    User defined signal 2

We can use the kill command on our shell to send signals to a specific PID.

And that is what we are going to be using kill OUR_PROCESS_PID. Our program will gracefully handle SIGTERM (requesting to stop our program), by notifying all our active clients with a message "Server is shutting down" and exiting, instead of abruptly exiting.

Ok, we are going to work on our main.swift. We are going to create a specific function to set up our kqueue. We'll call it setSignalKqueue. The process is the same that we used for setting up the kqueue for our socket. So I'll show you the whole function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func setSignalKqueue() {
    print("Setting up Signal Handler")
    let sockKqueue = kqueue()
    if sockKqueue == -1 {
        print("Error creating kqueue")
        exit(EXIT_FAILURE)
    }

    // the signal API takes precedence over kqueue event handling
    // to avoid this behaviour we are going to ignore the SIGTERM
    // and handle it using our kqueue implementation
    signal (SIGTERM, SIG_IGN);


    var edit = kevent(
        ident: UInt(SIGTERM),
        filter: Int16(EVFILT_SIGNAL),
        flags: UInt16(EV_ADD | EV_ENABLE),
        fflags: 0,
        data: 0,
        udata: nil
    )
    kevent(sockKqueue, &edit, 1, nil, 0, nil)


    DispatchQueue.global(qos: .default).async {
        while true {
            var event = kevent()
            let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
            if  status == 0 {
                print("Timeout")
            } else if status > 0 {
                print("We got a terminate signal! \(event)")
                server.stop()
                exit(130) //Exit code for "Script terminated by Control-C"
            } else {
                print("Error reading kevent")
                close(sockKqueue)
                exit(EXIT_FAILURE)
            }
        }
    }
}

Note that we are ignoring the default signal API so that we can use our kqueue implementation. Our kevent structure is similar to the one we created for our socket kqueue. The difference is in the ident and filter fields. We are using the filter EVFILT_SIGNAL that tells the kernel that ident represents a signal we want to be observed for events.

After the initialization, we are going to wait for any event asynchronously, and if we get a SIGTERM we'll call server.stop (Which we still have to implement). Also, notice that we use the exit code 130, which means "Script terminated by Control-C". Exit codes are important, always use the correct exit code, so our programs behave as good *nix commands are expected to.

alright this is how our main should look:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import Foundation

print("Welcome to our simple echo server!")


var server = Server()


func setSignalKqueue() {
    print("Setting up Signal Handler")
    let sockKqueue = kqueue()
    if sockKqueue == -1 {
        print("Error creating kqueue")
        exit(EXIT_FAILURE)
    }

    // the signal API takes precedence over kqueue event handling
    // to avoid this behaviour we are going to ignore the SIGTERM
    // and handle it using our kqueue implementation
    signal (SIGTERM, SIG_IGN);


    var edit = kevent(
        ident: UInt(SIGTERM),
        filter: Int16(EVFILT_SIGNAL),
        flags: UInt16(EV_ADD | EV_ENABLE),
        fflags: 0,
        data: 0,
        udata: nil
    )
    kevent(sockKqueue, &edit, 1, nil, 0, nil)


    DispatchQueue.global(qos: .default).async {
        while true {
            var event = kevent()
            let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
            if  status == 0 {
                print("Timeout")
            } else if status > 0 {
                print("We got a terminate signal! \(event)")
                server.stop()
                exit(130) //Exit code for "Script terminated by Control-C"
            } else {
                print("Error reading kevent")
                close(sockKqueue)
                exit(EXIT_FAILURE)
            }
        }
    }
}
setSignalKqueue()
server.start()



RunLoop.main.run()
exit(EXIT_SUCCESS)

Now let's add stop to our Server.swift.

First, we are going to add a new property to our Server an array to keep track of all of the current Clients.

1
var clients = [Int32]() //File descriptors are represented by Int32

Now we need to add the clients to the array when we accept them. Inside the start function, after we accepted the new client connection we'll append it to our clients array:

1
2
3
4
5
6
7
8
9
10
11
...
            print("About to accept")
            let clientFD = accept(socketFD, &addr, &addr_len)
            print("Accepted new client with file descriptor: \(clientFD)")

            if clientFD == -1 {
                print("Error accepting connection")
                continue
            }
            clients.append(clientFD)
...

Now we have to remove the clients when the connection is closed. In the function that handles the socket kevent, we also receive a notification when the client socket gets closed so there we are going to remove the client from the clients array.

1
2
3
4
5
6
7
8
9
...   
                 if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        if let index = self.clients.firstIndex(of: fd) {
                            self.clients.remove(at: index)
                        }
                        break
                    }
...

Now we are going to create a helper function to write to a specific socket:

1
2
3
    func writeTo(socket fd: Int32, message: String) {
        write(fd, message.cString(using: .utf8), message.count)
    }

With that we can implement our stop function:

1
2
3
4
5
6
    func stop() {
        for client in clients {
            writeTo(socket: client, message: "Server shutting down")
            close(client)
        }
    }

Here is the full code for Server.swift:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import Foundation

class Server {

    let servicePort = "1234"
    let prompts = ["%", "$", ">"]
    var currentPrompt = 0
    var clients = [Int32]() //File descriptors are represented by Int32

    func stop() {
        for client in clients {
            writeTo(socket: client, message: "Server shutting down")
            close(client)
        }
    }

    func writeTo(socket fd: Int32, message: String) {
        write(fd, message.cString(using: .utf8), message.count)
    }

    func readFrom(socket fd: Int32) {
      let MTU = 65536
      var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)

      let readResult = read(fd, &buffer, MTU)

      if (readResult == 0) {
        return  // end of file
      } else if (readResult == -1) {
        print("Error reading form client\(fd) - \(errno)")
        return  // error
      } else {
        //This is an ugly way to add the null-terminator at the end of the buffer we just read
        withUnsafeMutablePointer(to: &buffer) {
          $0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
            $0.advanced(by: readResult).assign(repeating: 0, count: 1)
          }
        }
        let strResult = withUnsafePointer(to: &buffer) {
          $0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
            String(cString: $0)
          }
        }
        print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
        write(fd, &buffer, readResult)
      }
    }

    func setSockKqueue(fd: Int32) {
        let sockKqueue = kqueue()
        if sockKqueue == -1 {
            print("Error creating kqueue")
            exit(EXIT_FAILURE)
        }


        // Create the kevent structure that sets up our kqueue to listen
        // for notifications
        var sockKevent = kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil
        )
        // This is where the kqueue is register with our 
        // interest for the notifications described by
        // our kevent structure sockKevent
        kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)


        DispatchQueue.global(qos: .default).async {
            var event = kevent()
            while true {
                let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {
                    if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        if let index = self.clients.firstIndex(of: fd) {
                            self.clients.remove(at: index)
                        }
                        break
                    }
                    print("File descriptor: \(fd) - has \(event.data) characters for reading")
                    self.readFrom(socket: fd)
                } else {
                    print("Error reading kevent")
                    close(sockKqueue)
                    exit(EXIT_FAILURE)
                }
            }
            print("Bye from kevent")
        }
    }

    func start() {
        print("Server starting...")

        let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
            SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
            IPPROTO_TCP  //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
        )//Return a FileDescriptor -1 = error
        if socketFD == -1 {
            print("Error creating BSD Socket")
            return
        }

        var hints = addrinfo(
            ai_flags: AI_PASSIVE,       // Assign the address of the local host to the socket structures
            ai_family: AF_UNSPEC,       // Either IPv4 or IPv6
            ai_socktype: SOCK_STREAM,   // TCP
            ai_protocol: 0,
            ai_addrlen: 0,
            ai_canonname: nil,
            ai_addr: nil,
            ai_next: nil)

        var servinfo: UnsafeMutablePointer<addrinfo>? = nil
        let addrInfoResult = getaddrinfo(
            nil,                        // Any interface
            servicePort,                   // The port on which will be listenend
            &hints,                     // Protocol configuration as per above
            &servinfo)

        if addrInfoResult != 0 {
            print("Error getting address info: \(errno)")
            return
        }

        let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))

        if bindResult == -1 {
            print("Error binding socket to Address: \(errno)")
            return
        }

        let listenResult = listen(socketFD, //Socket File descriptor
            8         // The backlog argument defines the maximum length the queue of pending connections may grow to
        )

        if listenResult == -1 {
            print("Error setting our socket to listen")
            return
        }

        while true {
            var addr = sockaddr()
            var addr_len :socklen_t = 0

            print("About to accept")
            let clientFD = accept(socketFD, &addr, &addr_len)
            print("Accepted new client with file descriptor: \(clientFD)")

            if clientFD == -1 {
                print("Error accepting connection")
                continue
            }
            clients.append(clientFD)

            setSockKqueue(fd: clientFD)
        }
    }
}

Ok, now we can run our server in one shell. Attach one or many clients to it, and send a kill command to the server PID.

1
2
3
4
5
6
7
# run our server
$ swift run
#Welcome to our simple echo server!
#Setting up Signal Handler
#Server starting...
#About to accept

Attach a client to it:

1
2
# I'm using our implementation of ncat
$ rdncat rderik.local 1234

And we can kill the process rdkqueue. You can find your PID by using ps or by any other means you regularly use. If you don't have any preference, you could use the following command: (if you used a different name change accordingly)

1
$ kill `ps aux | grep rdkqueue | awk '{printf "%d ",$2;}'`

And you should receive a message on your client before the server was shutdown. Perfect!

Alright, socket and signal events handled using kqueue, let's do file system.

Monitoring changes in the filesystem using kqueue

Before anyone sends me a message saying "FSEvent already does this", yea I know.

Ok, let's continue. This is going to be a contrived example, but I'm running out of ideas here so be patient.

If you've used passenger for your web server needs, you might have noticed that you can restart your passenger instance by executing the command touch tmp/restart.txt. Passenger is "somehow" observing that file, and if the file is modified, it triggers a restart.

The touch command changes the modified time of a file. We'll observe a specific file also, and if any of the attributes of the file (modified date for example) changes we'll get a notification. We are not going to restart our server, only change the prompt used to separate what the client sent:

1
Received form client(12) % howdy

You see the % symbol, that is our "prompt". We'll cycle through three different prompts ["%", "$", ">"] every time the file change_prompt.txt gets changed.

All the changes will be in our Server.swift. So first, we'll need to add the prompts and the current prompt we are using:

1
2
    let prompts = ["%", "$", ">"]
    var currentPrompt = 0

In the function readFrom where we display the prompt we'll set to display the current prompt:

1
2
3
...
        print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
...

By now you should be familiar with the whole kqueue process so I'll show you the entire function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    func setReloadPromptObserver() {
        let reloadKqueue = kqueue()
        if reloadKqueue == -1 {
            print("Error creating prompt kqueue")
            exit(EXIT_FAILURE)
        }

        let fileTrigger = open(FileManager.default.fileSystemRepresentation(withPath: "./change_prompt.txt"), O_EVTONLY)
        guard fileTrigger >= 0 else {
            print("Error: there was an error reading ./change_prompt.txt")
            return
        }

        var fileKevent = kevent(
            ident: UInt(fileTrigger),
            filter: Int16(EVFILT_VNODE),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: UInt32(NOTE_ATTRIB),
            data: 0,
            udata: nil
        )

        kevent(reloadKqueue, &fileKevent, 1, nil, 0, nil)

        DispatchQueue.global(qos: .default).async {
            var event = kevent()
                let status = kevent(reloadKqueue, nil, 0, &event, 1, nil)
                self.currentPrompt = (self.currentPrompt + 1) % self.prompts.count
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {

                } else {
                    print("Error reading kevent")
                    close(reloadKqueue)
                }
            self.setReloadPromptObserver()
        }
        print("Completed Set Reload Prompt")
    }  

We get a file descriptor using the open system call, and that is what we are going to pass to our kevent structure as ident. Also, we are going to use the filter EVFILT_VNODE. The EVFILT_VNODE supports filter flags, where we are going to specify what to look for. We are going to tell it to look for changes in the attributes by passing the flag NOTE_ATTRIB. The rest is the same as before. We will only increase the currentPromt so it cycles to the existing prompt options.

Ok, now we add the call to setReloadPromptObserver() in our start function.

1
2
...setReloadPromptObserver()
...

And that's it. You can see next the whole Server.swift code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import Foundation

class Server {

    let servicePort = "1234"
    let prompts = ["%", "$", ">"]
    var currentPrompt = 0
    var clients = [Int32]() //File descriptors are represented by Int32

    func setReloadPromptObserver() {
        let reloadKqueue = kqueue()
        if reloadKqueue == -1 {
            print("Error creating prompt kqueue")
            exit(EXIT_FAILURE)
        }

        let fileTrigger = open(FileManager.default.fileSystemRepresentation(withPath: "./change_prompt.txt"), O_EVTONLY)
        guard fileTrigger >= 0 else {
            print("Error: there was an error reading ./change_prompt.txt")
            return
        }

        var fileKevent = kevent(
            ident: UInt(fileTrigger),
            filter: Int16(EVFILT_VNODE),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: UInt32(NOTE_ATTRIB),
            data: 0,
            udata: nil
        )

        kevent(reloadKqueue, &fileKevent, 1, nil, 0, nil)

        DispatchQueue.global(qos: .default).async {
            var event = kevent()
                let status = kevent(reloadKqueue, nil, 0, &event, 1, nil)
                self.currentPrompt = (self.currentPrompt + 1) % self.prompts.count
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {

                } else {
                    print("Error reading kevent")
                    close(reloadKqueue)
                }
            self.setReloadPromptObserver()
        }
        print("Completed Set Reload Prompt")
    }

    func stop() {
        for client in clients {
            writeTo(socket: client, message: "Server shutting down")
            close(client)
        }
    }

    func writeTo(socket fd: Int32, message: String) {
        write(fd, message.cString(using: .utf8), message.count)
    }

    func readFrom(socket fd: Int32) {
      let MTU = 65536
      var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)

      let readResult = read(fd, &buffer, MTU)

      if (readResult == 0) {
        return  // end of file
      } else if (readResult == -1) {
        print("Error reading form client\(fd) - \(errno)")
        return  // error
      } else {
        //This is an ugly way to add the null-terminator at the end of the buffer we just read
        withUnsafeMutablePointer(to: &buffer) {
          $0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
            $0.advanced(by: readResult).assign(repeating: 0, count: 1)
          }
        }
        let strResult = withUnsafePointer(to: &buffer) {
          $0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
            String(cString: $0)
          }
        }
        print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
        write(fd, &buffer, readResult)
      }
    }

    func setSockKqueue(fd: Int32) {
        let sockKqueue = kqueue()
        if sockKqueue == -1 {
            print("Error creating kqueue")
            exit(EXIT_FAILURE)
        }


        // Create the kevent structure that sets up our kqueue to listen
        // for notifications
        var sockKevent = kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil
        )
        // This is where the kqueue is register with our 
        // interest for the notifications described by
        // our kevent structure sockKevent
        kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)


        DispatchQueue.global(qos: .default).async {
            var event = kevent()
            while true {
                let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
                if  status == 0 {
                    print("Timeout")
                } else if status > 0 {
                    if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                        if let index = self.clients.firstIndex(of: fd) {
                            self.clients.remove(at: index)
                        }
                        break
                    }
                    print("File descriptor: \(fd) - has \(event.data) characters for reading")
                    self.readFrom(socket: fd)
                } else {
                    print("Error reading kevent")
                    close(sockKqueue)
                    exit(EXIT_FAILURE)
                }
            }
            print("Bye from kevent")
        }
    }

    func start() {
        print("Server starting...")

        let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
            SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
            IPPROTO_TCP  //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
        )//Return a FileDescriptor -1 = error
        if socketFD == -1 {
            print("Error creating BSD Socket")
            return
        }

        var hints = addrinfo(
            ai_flags: AI_PASSIVE,       // Assign the address of the local host to the socket structures
            ai_family: AF_UNSPEC,       // Either IPv4 or IPv6
            ai_socktype: SOCK_STREAM,   // TCP
            ai_protocol: 0,
            ai_addrlen: 0,
            ai_canonname: nil,
            ai_addr: nil,
            ai_next: nil)

        var servinfo: UnsafeMutablePointer<addrinfo>? = nil
        let addrInfoResult = getaddrinfo(
            nil,                        // Any interface
            servicePort,                   // The port on which will be listenend
            &hints,                     // Protocol configuration as per above
            &servinfo)

        if addrInfoResult != 0 {
            print("Error getting address info: \(errno)")
            return
        }

        let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))

        if bindResult == -1 {
            print("Error binding socket to Address: \(errno)")
            return
        }

        let listenResult = listen(socketFD, //Socket File descriptor
            8         // The backlog argument defines the maximum length the queue of pending connections may grow to
        )

        if listenResult == -1 {
            print("Error setting our socket to listen")
            return
        }

        while true {
            var addr = sockaddr()
            var addr_len :socklen_t = 0

            print("About to accept")
            let clientFD = accept(socketFD, &addr, &addr_len)
            print("Accepted new client with file descriptor: \(clientFD)")

            if clientFD == -1 {
                print("Error accepting connection")
                continue
            }
            clients.append(clientFD)

            setSockKqueue(fd: clientFD)
            setReloadPromptObserver()
        }
    }
}

Before we run the server, create the file change_prompt.txt in your project directory.

1
$ touch change_prompt.txt

Now you can run the server:

1
$ swift run

In a different shell, connect a client:

1
$ rdncat rderik.local 1234

When the server receives a message from the client, you should see our default % prompt. In a different shell change the modification time again using touch.

1
$ touch change_prompt.txt

Send another message from the client and see how the prompt has changed.

And that's it.

* You can check the full code in the GitHub Repository

Final thoughts

You've used kqueue to track events from sockets, signals, and changes in the file system. By now, you can see how useful this mechanism is. Many higher-level APIs/Frameworks provide similar functionality, but probably many of them are based on kqueue (e.g. GCD (Grand Central Dispatch)).

I encourage you to read about the other filters. They can come in handy in some of your programs.

Alright, thanks for reading. I hope you find it useful, and as always, feedback is welcome.

Related topics/notes of interest


  1. man 2 select 

  2. man 2 poll 

  3. The filter field of the kevent struct helps us define which type of event we are going to be using. Check select(2)'s man page in the filter section for all the supported events. 

  4. man 2 kqueue 

  5. man 2 kqueue 

  6. man 2 kqueue 


** If you want to check what else I'm currently doing, be sure to follow me on twitter @rderik or subscribe to the newsletter. If you want to send me a direct message, you can send it to derik@rderik.com.