Using Kernel Queues (kqueue) notifications in Swift Oct 9 2019 Latest Update: Jun 27 2020
Many components of macOS trace their roots back to BSD. One key aspect inherited from BSD is the Kernel Event Notification mechanism know as Kernel Queues (kqueue
s for short).
In the past, before kqueue(2)
1/ kevent(2)
2, when we wanted to track events related to a socket, or if a file descriptor changed, we used a polling mechanism to test for readiness/events (maybe using select(2)
or poll(2)
). This polling techniques were inefficient and got replaced by the more efficient kqueue
API. Instead of continually polling, we can now be notified about events by the kernel. Also, mechanisms like select(2)
and poll(2)
were restricted to work with file descriptors. kqueue
increases the range of operating systems events to monitor not only for file descriptors but also elements like signals, timers, network devices, etc.
In this post, we are going to explore how to use kqueue
using Swift. The examples will include monitoring sockets, signals, and file system events. kqueue
also support monitor events from timers, processes, network devices, memory, and a few others3. We are going to focus on three cases (sockets, signals, filesystem), but using the three cases as a base, you'll be able to use kqueue
on any of the supported event types.
Let's begin by exploring the idea behind kqueue
.
* You can check the full code in the GitHub Repository
Table of Contents
Using kernel event notifications
As we mentioned before, it is inefficient to poll for events, and sometimes we can't obtain the event directly. So how can we know if an event occurred?
We know that all the system calls always notify the kernel, so it would be the perfect place to get the notifications from.
That is the idea behind kqueue
. We create a kernel event queue, where we show our interest to the kernel for specific events, and when the kernel has some information about the events we are interested in it'll push it to our queue.
The process is as follows:
- Create a kernel event queue using the system call
kqueue
this returns a file descriptor. - We set up our
kqueue
to register our interest in specific events by using the system callkevent
. - Again we use
kevent
but this time to listen for the events to be triggered.
That's it. Simple right?
There is more to it, but that is the skeleton from where we'll build our understanding of kqueue
s.
For our example, we are going to use a socket-based server. I'll use the same code for the echo server we created in the post Using BSD sockets in Swift. I'll focus on the code related to kqueue
if you have any questions on the server code check the original post.
Ok, Let's get started by monitoring socket events.
Monitoring socket events using kqueue
In our previous implementation of the echo server, we could work only with one client at a time. Now let's change that, let's support multiple clients at the same time using GDC. Also, we are going to set up an event listener (using kevent
) that will log how many characters are ready for reading on a specific socket(file descriptor). After getting the notification, we'll continue with what we previously had implemented, read the data and echo it back the client.
First, we are going to encapsulate the code that handles the reading from the socket, that way we don't have it cluttering our code. The function will look like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func readFrom(socket fd: Int32) {
let MTU = 65536
var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)
let readResult = read(fd, &buffer, MTU)
if (readResult == 0) {
return // end of file
} else if (readResult == -1) {
print("Error reading form client\(fd) - \(errno)")
return // error
} else {
//This is an ugly way to add the null-terminator at the end of the buffer we just read
withUnsafeMutablePointer(to: &buffer) {
$0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
$0.advanced(by: readResult).assign(repeating: 0, count: 1)
}
}
let strResult = withUnsafePointer(to: &buffer) {
$0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
String(cString: $0)
}
}
print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
write(fd, &buffer, readResult)
}
}
Now we can focus on the code for setting our kqueue
and kevent
. We are going to create a function to handle the set up. Create the new function setSockKqueue
with the following signature:
1
func setSockKqueue(fd: Int32)
Remember our steps for setting up the kqueue
:
- Create a kernel event queue using the system call
kqueue
this returns a file descriptor. - We set up our
kqueue
to register our interest in specific events by using the system callkevent
. - Again we use
kevent
but this time to listen for the events to be triggered.
Ok, let's create our kqueue
:
1
2
3
4
5
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
The kqueue
syscall returns a file descriptor. We can access our kernel queue using that file descriptor. If an error occurs, the kqueue
returns -1
.
Now we have to register our interest in a specific event. We use the system call kevent
to register our kqueue
for some event. Also, we use the same system call, kevent
to read any pending events on the queue. The system call signature is the following:
1
2
3
kevent(_ kq: Int32, _ changelist: UnsafePointer<kevent>!,
_ nchanges: Int32, _ eventlist: UnsafeMutablePointer<kevent>!,
_ nevents: Int32, _ timeout: UnsafePointer<timespec>!) -> Int32
If we send a kevent
structure on the changelist
parameter, the system call will use that structure information to register our interest in the event described by the structure. If the changelist
parameter is nil
, the system call will wait(blocking) until an event matching any of our queue's interest is triggered and will save the event information in the kevent
structure passed in the eventlist
parameter. This process might sound confusing so let's see it in our code to make things clear.
First, let's view the definition of the kevent
structure:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public struct kevent {
public var ident: UInt /* identifier for this event */
public var filter: Int16 /* filter for event */
public var flags: UInt16 /* general flags */
public var fflags: UInt32 /* filter-specific flags */
public var data: Int /* filter-specific data */
public var udata: UnsafeMutableRawPointer! /* opaque user data identifier */
public init()
public init(ident: UInt, filter: Int16, flags: UInt16, fflags: UInt32, data: Int, udata: UnsafeMutableRawPointer!)
}
Let me explain each of the structure properties:
* Remember that the structure can be populated by you, or by the
kevent
system call. So some of the fields make sense only when written by the triggered event.
- ident: this property is used to identify our event. For example, it could be a file descriptor for a file, or a file descriptor for a socket, or a specific signal, etcetera. In combination with the
filter
property, we can determine what theident
property points to. For example, if the filter isEVFILT_SIGNAL
, theident
property will point to a specific system signal. - filter: identifies the filter to be used for this event.
- Next you'll see a few examples for this field, to see the system supported filters, check the man page for
kevent
4EVFILT_READ
- triggers when the file descriptor is ready for reading.EVFILT_WRITE
- triggers when the file descriptor is available for writing.
- Next you'll see a few examples for this field, to see the system supported filters, check the man page for
- flags: this property indicates the actions for this event.
- Next are a few examples for this field, to see the system supported flags, check the man page for
kevent
5EV_ADD
- adds the event to the kernel queue.EV_ENABLE
- allows thekevent
to return the event if the event is triggered.EV_EOF
- some filters set this flag to indicated End-Of-File conditions.
- Next are a few examples for this field, to see the system supported flags, check the man page for
- fflags: filter-specific flags.
- For example, for the filter
EVFILT_VNODE
, we could have the following filter-specific flags: (for a full list of filter-specific flags6)NOTE_ATTRIB
- triggers when the file descriptor had its attributes changed.-
NOTE_WRITE
- triggers when there was a write event on the file descriptor.
- For example, for the filter
- data: filter-specific data. This field can be written by
kevent
when the event is triggered. - udata: Opaque user-defined value passed through the kernel unchanged.
Ok, let's create our structure to register in our kqueue
:
1
2
3
4
5
6
7
8
9
10
// Create the kevent structure that sets up our kqueue to listen
// for notifications
var sockKevent = kevent(
ident: UInt(fd),
filter: Int16(EVFILT_READ),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
Here we are using the file descriptor fd
that we received as a parameter. We want to be notified when the socket (represented by the file descriptor) has data to be read. We want to add the following actions EV_ADD
and EV_ENABLE
, this means add this event to our kqueue
and allow the event to be returned when triggered. The flags
, data
, and udata
we are not going to use so we set them to 0
or nil
on each case.
Now we can call kevent
with our kqueue
and kevent
structure to set up the kernel queue.
1
2
3
4
// This is where the kqueue is register with our
// interest for the notifications described by
// our kevent structure sockKevent
kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
Ok, now we are going to use the kevent
to "listen" for events to be triggered.
We'll be running this code in a different thread using DispatchQueue
asynchronously, so the main thread is not blocked.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DispatchQueue.global(qos: .default).async {
var event = kevent()
while true {
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
break
}
print("File descriptor: \(fd) - has \(event.data) characters for reading")
self.readFrom(socket: fd)
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
print("Bye from kevent")
}
We won't go into detail for the DispatchQueue
code. Let's focus on the code related to kevent
.
We wait for any event on our sockKqueue
to be triggered and then we read the data that is populated in the event
structure. After we are sure that the socket has data we call readFrom(socket:)
to read the data. Then we start the loop again and call kevent
that will block and wait for another event to be triggered. We could have also added a timer as a time out. An example code could look like this:
1
2
var timeout = timespec(tv_sec: 5, tv_nsec: 0)
let status = kevent(sockKqueue, nil, 0, &event, 1, &timeout)
This code will cause the kevent
to timeout after 5 seconds, and the return value stored in status
would be 0
.
Le'ts see the full body of our function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func setSockKqueue(fd: Int32) {
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// Create the kevent structure that sets up our kqueue to listen
// for notifications
var sockKevent = kevent(
ident: UInt(fd),
filter: Int16(EVFILT_READ),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
// This is where the kqueue is register with our
// interest for the notifications described by
// our kevent structure sockKevent
kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
while true {
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
print("Status: \(status) - Esto es el event after kevent:\(event)")
if status == 0 {
print("Timeout")
} else if status > 0 {
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
break
}
print("File descriptor: \(fd) - has \(event.data) characters for reading")
self.readFrom(socket: fd)
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
print("Bye from kevent")
}
}
Ok, next you'll see the complete code for Server.swift
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import Foundation
class Server {
let servicePort = "1234"
let prompts = ["%", "$", ">"]
var currentPrompt = 0
func readFrom(socket fd: Int32) {
let MTU = 65536
var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)
let readResult = read(fd, &buffer, MTU)
if (readResult == 0) {
return // end of file
} else if (readResult == -1) {
print("Error reading form client\(fd) - \(errno)")
return // error
} else {
//This is an ugly way to add the null-terminator at the end of the buffer we just read
withUnsafeMutablePointer(to: &buffer) {
$0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
$0.advanced(by: readResult).assign(repeating: 0, count: 1)
}
}
let strResult = withUnsafePointer(to: &buffer) {
$0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
String(cString: $0)
}
}
print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
write(fd, &buffer, readResult)
}
}
func setSockKqueue(fd: Int32) {
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// Create the kevent structure that sets up our kqueue to listen
// for notifications
var sockKevent = kevent(
ident: UInt(fd),
filter: Int16(EVFILT_READ),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
// This is where the kqueue is register with our
// interest for the notifications described by
// our kevent structure sockKevent
kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
while true {
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
break
}
print("File descriptor: \(fd) - has \(event.data) characters for reading")
self.readFrom(socket: fd)
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
print("Bye from kevent")
}
}
func start() {
print("Server starting...")
let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
IPPROTO_TCP //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
)//Return a FileDescriptor -1 = error
if socketFD == -1 {
print("Error creating BSD Socket")
return
}
var hints = addrinfo(
ai_flags: AI_PASSIVE, // Assign the address of the local host to the socket structures
ai_family: AF_UNSPEC, // Either IPv4 or IPv6
ai_socktype: SOCK_STREAM, // TCP
ai_protocol: 0,
ai_addrlen: 0,
ai_canonname: nil,
ai_addr: nil,
ai_next: nil)
var servinfo: UnsafeMutablePointer<addrinfo>? = nil
let addrInfoResult = getaddrinfo(
nil, // Any interface
servicePort, // The port on which will be listenend
&hints, // Protocol configuration as per above
&servinfo)
if addrInfoResult != 0 {
print("Error getting address info: \(errno)")
return
}
let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))
if bindResult == -1 {
print("Error binding socket to Address: \(errno)")
return
}
let listenResult = listen(socketFD, //Socket File descriptor
8 // The backlog argument defines the maximum length the queue of pending connections may grow to
)
if listenResult == -1 {
print("Error setting our socket to listen")
return
}
while true {
var addr = sockaddr()
var addr_len :socklen_t = 0
print("About to accept")
let clientFD = accept(socketFD, &addr, &addr_len)
print("Accepted new client with file descriptor: \(clientFD)")
if clientFD == -1 {
print("Error accepting connection")
}
setSockKqueue(fd: clientFD)
}
}
}
That's it. Everything is ready. We can test our kqueue
code. Let's run our server:
1
2
# if we are using Swift Package Manager
$ swift run
On another shell, we can connect to our server (on port 234
) using ncat
(or rdncat
if you are using the one we created in the post about Network.framework
). If we send two messages hi
and hello
, we would see something similar to this:
1
2
3
4
5
6
7
8
9
10
11
Welcome to our simple echo server!
Server starting...
About to accept
Accepted new client with file descriptor: 7
About to accept
File descriptor: 7 - has 2 characters for reading
Received form client(7) % hi
File descriptor: 7 - has 5 characters for reading
Received form client(7) % hello
The socket (7) has been closed.
Bye from kevent
You can see that our kevent
is working correctly.
Let's now use kqueue
/kevent
to handle signals.
Using kqueue
/kevent
to listen for signals
If you are not familiar with signals, signals are a basic form of IPC (Inter-Process Communication). When we send a signal to a process to interact with it, the process is interrupted by the kernel to deliver the signal, and the process can react to the signal or ignore it (Except SIGKILL=9 which can't be ignored).
Some of the common signals are (check your man 3 signal
for the ones supported by your system):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
No Name Default Action Description
1 SIGHUP terminate process terminal line hangup
2 SIGINT terminate process interrupt program
3 SIGQUIT create core image quit program
4 SIGILL create core image illegal instruction
5 SIGTRAP create core image trace trap
6 SIGABRT create core image abort program (formerly SIGIOT)
7 SIGEMT create core image emulate instruction executed
8 SIGFPE create core image floating-point exception
9 SIGKILL terminate process kill program
10 SIGBUS create core image bus error
11 SIGSEGV create core image segmentation violation
12 SIGSYS create core image non-existent system call invoked
13 SIGPIPE terminate process write on a pipe with no reader
14 SIGALRM terminate process real-time timer expired
15 SIGTERM terminate process software termination signal
16 SIGURG discard signal urgent condition present on socket
17 SIGSTOP stop process stop (cannot be caught or ignored)
18 SIGTSTP stop process stop signal generated from keyboard
19 SIGCONT discard signal continue after stop
20 SIGCHLD discard signal child status has changed
21 SIGTTIN stop process background read attempted from control terminal
22 SIGTTOU stop process background write attempted to control terminal
23 SIGIO discard signal I/O is possible on a descriptor (see fcntl(2))
24 SIGXCPU terminate process cpu time limit exceeded (see setrlimit(2))
25 SIGXFSZ terminate process file size limit exceeded (see setrlimit(2))
26 SIGVTALRM terminate process virtual time alarm (see setitimer(2))
27 SIGPROF terminate process profiling timer alarm (see setitimer(2))
28 SIGWINCH discard signal Window size change
29 SIGINFO discard signal status request from keyboard
30 SIGUSR1 terminate process User defined signal 1
31 SIGUSR2 terminate process User defined signal 2
We can use the kill
command on our shell to send signals to a specific PID.
And that is what we are going to be using kill OUR_PROCESS_PID
. Our program will gracefully handle SIGTERM
(requesting to stop our program), by notifying all our active clients with a message "Server is shutting down" and exiting, instead of abruptly exiting.
Ok, we are going to work on our main.swift
. We are going to create a specific function to set up our kqueue
. We'll call it setSignalKqueue
. The process is the same that we used for setting up the kqueue
for our socket. So I'll show you the whole function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func setSignalKqueue() {
print("Setting up Signal Handler")
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// the signal API takes precedence over kqueue event handling
// to avoid this behaviour we are going to ignore the SIGTERM
// and handle it using our kqueue implementation
signal (SIGTERM, SIG_IGN);
var edit = kevent(
ident: UInt(SIGTERM),
filter: Int16(EVFILT_SIGNAL),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
kevent(sockKqueue, &edit, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
while true {
var event = kevent()
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
print("We got a terminate signal! \(event)")
server.stop()
exit(130) //Exit code for "Script terminated by Control-C"
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
}
}
Note that we are ignoring the default signal
API so that we can use our kqueue
implementation. Our kevent
structure is similar to the one we created for our socket kqueue
. The difference is in the ident
and filter
fields. We are using the filter EVFILT_SIGNAL
that tells the kernel that ident
represents a signal we want to be observed for events.
After the initialization, we are going to wait for any event asynchronously, and if we get a SIGTERM
we'll call server.stop
(Which we still have to implement). Also, notice that we use the exit code 130
, which means "Script terminated by Control-C". Exit codes are important, always use the correct exit code, so our programs behave as good *nix commands are expected to.
alright this is how our main should look:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import Foundation
print("Welcome to our simple echo server!")
var server = Server()
func setSignalKqueue() {
print("Setting up Signal Handler")
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// the signal API takes precedence over kqueue event handling
// to avoid this behaviour we are going to ignore the SIGTERM
// and handle it using our kqueue implementation
signal (SIGTERM, SIG_IGN);
var edit = kevent(
ident: UInt(SIGTERM),
filter: Int16(EVFILT_SIGNAL),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
kevent(sockKqueue, &edit, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
while true {
var event = kevent()
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
print("We got a terminate signal! \(event)")
server.stop()
exit(130) //Exit code for "Script terminated by Control-C"
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
}
}
setSignalKqueue()
server.start()
RunLoop.main.run()
exit(EXIT_SUCCESS)
Now let's add stop
to our Server.swift
.
First, we are going to add a new property to our Server
an array to keep track of all of the current Clients.
1
var clients = [Int32]() //File descriptors are represented by Int32
Now we need to add the clients to the array when we accept them. Inside the start
function, after we accepted the new client connection we'll append it to our clients
array:
1
2
3
4
5
6
7
8
9
10
11
...
print("About to accept")
let clientFD = accept(socketFD, &addr, &addr_len)
print("Accepted new client with file descriptor: \(clientFD)")
if clientFD == -1 {
print("Error accepting connection")
continue
}
clients.append(clientFD)
...
Now we have to remove the clients when the connection is closed. In the function that handles the socket kevent
, we also receive a notification when the client socket gets closed so there we are going to remove the client from the clients
array.
1
2
3
4
5
6
7
8
9
...
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
if let index = self.clients.firstIndex(of: fd) {
self.clients.remove(at: index)
}
break
}
...
Now we are going to create a helper function to write to a specific socket:
1
2
3
func writeTo(socket fd: Int32, message: String) {
write(fd, message.cString(using: .utf8), message.count)
}
With that we can implement our stop
function:
1
2
3
4
5
6
func stop() {
for client in clients {
writeTo(socket: client, message: "Server shutting down")
close(client)
}
}
Here is the full code for Server.swift
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import Foundation
class Server {
let servicePort = "1234"
let prompts = ["%", "$", ">"]
var currentPrompt = 0
var clients = [Int32]() //File descriptors are represented by Int32
func stop() {
for client in clients {
writeTo(socket: client, message: "Server shutting down")
close(client)
}
}
func writeTo(socket fd: Int32, message: String) {
write(fd, message.cString(using: .utf8), message.count)
}
func readFrom(socket fd: Int32) {
let MTU = 65536
var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)
let readResult = read(fd, &buffer, MTU)
if (readResult == 0) {
return // end of file
} else if (readResult == -1) {
print("Error reading form client\(fd) - \(errno)")
return // error
} else {
//This is an ugly way to add the null-terminator at the end of the buffer we just read
withUnsafeMutablePointer(to: &buffer) {
$0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
$0.advanced(by: readResult).assign(repeating: 0, count: 1)
}
}
let strResult = withUnsafePointer(to: &buffer) {
$0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
String(cString: $0)
}
}
print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
write(fd, &buffer, readResult)
}
}
func setSockKqueue(fd: Int32) {
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// Create the kevent structure that sets up our kqueue to listen
// for notifications
var sockKevent = kevent(
ident: UInt(fd),
filter: Int16(EVFILT_READ),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
// This is where the kqueue is register with our
// interest for the notifications described by
// our kevent structure sockKevent
kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
while true {
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
if let index = self.clients.firstIndex(of: fd) {
self.clients.remove(at: index)
}
break
}
print("File descriptor: \(fd) - has \(event.data) characters for reading")
self.readFrom(socket: fd)
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
print("Bye from kevent")
}
}
func start() {
print("Server starting...")
let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
IPPROTO_TCP //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
)//Return a FileDescriptor -1 = error
if socketFD == -1 {
print("Error creating BSD Socket")
return
}
var hints = addrinfo(
ai_flags: AI_PASSIVE, // Assign the address of the local host to the socket structures
ai_family: AF_UNSPEC, // Either IPv4 or IPv6
ai_socktype: SOCK_STREAM, // TCP
ai_protocol: 0,
ai_addrlen: 0,
ai_canonname: nil,
ai_addr: nil,
ai_next: nil)
var servinfo: UnsafeMutablePointer<addrinfo>? = nil
let addrInfoResult = getaddrinfo(
nil, // Any interface
servicePort, // The port on which will be listenend
&hints, // Protocol configuration as per above
&servinfo)
if addrInfoResult != 0 {
print("Error getting address info: \(errno)")
return
}
let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))
if bindResult == -1 {
print("Error binding socket to Address: \(errno)")
return
}
let listenResult = listen(socketFD, //Socket File descriptor
8 // The backlog argument defines the maximum length the queue of pending connections may grow to
)
if listenResult == -1 {
print("Error setting our socket to listen")
return
}
while true {
var addr = sockaddr()
var addr_len :socklen_t = 0
print("About to accept")
let clientFD = accept(socketFD, &addr, &addr_len)
print("Accepted new client with file descriptor: \(clientFD)")
if clientFD == -1 {
print("Error accepting connection")
continue
}
clients.append(clientFD)
setSockKqueue(fd: clientFD)
}
}
}
Ok, now we can run our server in one shell. Attach one or many clients to it, and send a kill command to the server PID.
1
2
3
4
5
6
7
# run our server
$ swift run
#Welcome to our simple echo server!
#Setting up Signal Handler
#Server starting...
#About to accept
Attach a client to it:
1
2
# I'm using our implementation of ncat
$ rdncat rderik.local 1234
And we can kill the process rdkqueue
. You can find your PID by using ps
or by any other means you regularly use. If you don't have any preference, you could use the following command: (if you used a different name change accordingly)
1
$ kill `ps aux | grep rdkqueue | awk '{printf "%d ",$2;}'`
And you should receive a message on your client before the server was shutdown. Perfect!
Alright, socket
and signal
events handled using kqueue
, let's do file system.
Monitoring changes in the filesystem using kqueue
Before anyone sends me a message saying "FSEvent already does this", yea I know.
Ok, let's continue. This is going to be a contrived example, but I'm running out of ideas here so be patient.
If you've used passenger
for your web server needs, you might have noticed that you can restart your passenger instance by executing the command touch tmp/restart.txt
. Passenger is "somehow" observing that file, and if the file is modified, it triggers a restart.
The touch
command changes the modified time of a file. We'll observe a specific file also, and if any of the attributes of the file (modified date for example) changes we'll get a notification. We are not going to restart our server, only change the prompt used to separate what the client sent:
1
Received form client(12) % howdy
You see the %
symbol, that is our "prompt". We'll cycle through three different prompts ["%", "$", ">"]
every time the file change_prompt.txt
gets changed.
All the changes will be in our Server.swift
. So first, we'll need to add the prompts and the current prompt we are using:
1
2
let prompts = ["%", "$", ">"]
var currentPrompt = 0
In the function readFrom
where we display the prompt we'll set to display the current prompt:
1
2
3
...
print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
...
By now you should be familiar with the whole kqueue
process so I'll show you the entire function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func setReloadPromptObserver() {
let reloadKqueue = kqueue()
if reloadKqueue == -1 {
print("Error creating prompt kqueue")
exit(EXIT_FAILURE)
}
let fileTrigger = open(FileManager.default.fileSystemRepresentation(withPath: "./change_prompt.txt"), O_EVTONLY)
guard fileTrigger >= 0 else {
print("Error: there was an error reading ./change_prompt.txt")
return
}
var fileKevent = kevent(
ident: UInt(fileTrigger),
filter: Int16(EVFILT_VNODE),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: UInt32(NOTE_ATTRIB),
data: 0,
udata: nil
)
kevent(reloadKqueue, &fileKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
let status = kevent(reloadKqueue, nil, 0, &event, 1, nil)
self.currentPrompt = (self.currentPrompt + 1) % self.prompts.count
if status == 0 {
print("Timeout")
} else if status > 0 {
} else {
print("Error reading kevent")
close(reloadKqueue)
}
self.setReloadPromptObserver()
}
print("Completed Set Reload Prompt")
}
We get a file descriptor using the open
system call, and that is what we are going to pass to our kevent
structure as ident
. Also, we are going to use the filter EVFILT_VNODE
. The EVFILT_VNODE
supports filter flags, where we are going to specify what to look for. We are going to tell it to look for changes in the attributes by passing the flag NOTE_ATTRIB
. The rest is the same as before. We will only increase the currentPromt
so it cycles to the existing prompt options.
Ok, now we add the call to setReloadPromptObserver()
in our start
function.
1
2
...
setReloadPromptObserver()
...
And that's it. You can see next the whole Server.swift
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import Foundation
class Server {
let servicePort = "1234"
let prompts = ["%", "$", ">"]
var currentPrompt = 0
var clients = [Int32]() //File descriptors are represented by Int32
func setReloadPromptObserver() {
let reloadKqueue = kqueue()
if reloadKqueue == -1 {
print("Error creating prompt kqueue")
exit(EXIT_FAILURE)
}
let fileTrigger = open(FileManager.default.fileSystemRepresentation(withPath: "./change_prompt.txt"), O_EVTONLY)
guard fileTrigger >= 0 else {
print("Error: there was an error reading ./change_prompt.txt")
return
}
var fileKevent = kevent(
ident: UInt(fileTrigger),
filter: Int16(EVFILT_VNODE),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: UInt32(NOTE_ATTRIB),
data: 0,
udata: nil
)
kevent(reloadKqueue, &fileKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
let status = kevent(reloadKqueue, nil, 0, &event, 1, nil)
self.currentPrompt = (self.currentPrompt + 1) % self.prompts.count
if status == 0 {
print("Timeout")
} else if status > 0 {
} else {
print("Error reading kevent")
close(reloadKqueue)
}
self.setReloadPromptObserver()
}
print("Completed Set Reload Prompt")
}
func stop() {
for client in clients {
writeTo(socket: client, message: "Server shutting down")
close(client)
}
}
func writeTo(socket fd: Int32, message: String) {
write(fd, message.cString(using: .utf8), message.count)
}
func readFrom(socket fd: Int32) {
let MTU = 65536
var buffer = UnsafeMutableRawPointer.allocate(byteCount: MTU,alignment: MemoryLayout<CChar>.size)
let readResult = read(fd, &buffer, MTU)
if (readResult == 0) {
return // end of file
} else if (readResult == -1) {
print("Error reading form client\(fd) - \(errno)")
return // error
} else {
//This is an ugly way to add the null-terminator at the end of the buffer we just read
withUnsafeMutablePointer(to: &buffer) {
$0.withMemoryRebound(to: UInt8.self, capacity: readResult + 1) {
$0.advanced(by: readResult).assign(repeating: 0, count: 1)
}
}
let strResult = withUnsafePointer(to: &buffer) {
$0.withMemoryRebound(to: CChar.self, capacity: MemoryLayout.size(ofValue: readResult)) {
String(cString: $0)
}
}
print("Received form client(\(fd)) \(self.prompts[self.currentPrompt]) \(strResult)")
write(fd, &buffer, readResult)
}
}
func setSockKqueue(fd: Int32) {
let sockKqueue = kqueue()
if sockKqueue == -1 {
print("Error creating kqueue")
exit(EXIT_FAILURE)
}
// Create the kevent structure that sets up our kqueue to listen
// for notifications
var sockKevent = kevent(
ident: UInt(fd),
filter: Int16(EVFILT_READ),
flags: UInt16(EV_ADD | EV_ENABLE),
fflags: 0,
data: 0,
udata: nil
)
// This is where the kqueue is register with our
// interest for the notifications described by
// our kevent structure sockKevent
kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
DispatchQueue.global(qos: .default).async {
var event = kevent()
while true {
let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
if status == 0 {
print("Timeout")
} else if status > 0 {
if (event.flags & UInt16(EV_EOF)) == EV_EOF {
print("The socket (\(fd)) has been closed.")
if let index = self.clients.firstIndex(of: fd) {
self.clients.remove(at: index)
}
break
}
print("File descriptor: \(fd) - has \(event.data) characters for reading")
self.readFrom(socket: fd)
} else {
print("Error reading kevent")
close(sockKqueue)
exit(EXIT_FAILURE)
}
}
print("Bye from kevent")
}
}
func start() {
print("Server starting...")
let socketFD = socket(AF_INET6, //Domain [AF_INET,AF_INET6, AF_UNIX]
SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET, SOCK_RAW]
IPPROTO_TCP //Protocol [IPPROTO_TCP, IPPROTO_SCTP, IPPROTO_UDP, IPPROTO_DCCP]
)//Return a FileDescriptor -1 = error
if socketFD == -1 {
print("Error creating BSD Socket")
return
}
var hints = addrinfo(
ai_flags: AI_PASSIVE, // Assign the address of the local host to the socket structures
ai_family: AF_UNSPEC, // Either IPv4 or IPv6
ai_socktype: SOCK_STREAM, // TCP
ai_protocol: 0,
ai_addrlen: 0,
ai_canonname: nil,
ai_addr: nil,
ai_next: nil)
var servinfo: UnsafeMutablePointer<addrinfo>? = nil
let addrInfoResult = getaddrinfo(
nil, // Any interface
servicePort, // The port on which will be listenend
&hints, // Protocol configuration as per above
&servinfo)
if addrInfoResult != 0 {
print("Error getting address info: \(errno)")
return
}
let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))
if bindResult == -1 {
print("Error binding socket to Address: \(errno)")
return
}
let listenResult = listen(socketFD, //Socket File descriptor
8 // The backlog argument defines the maximum length the queue of pending connections may grow to
)
if listenResult == -1 {
print("Error setting our socket to listen")
return
}
while true {
var addr = sockaddr()
var addr_len :socklen_t = 0
print("About to accept")
let clientFD = accept(socketFD, &addr, &addr_len)
print("Accepted new client with file descriptor: \(clientFD)")
if clientFD == -1 {
print("Error accepting connection")
continue
}
clients.append(clientFD)
setSockKqueue(fd: clientFD)
setReloadPromptObserver()
}
}
}
Before we run the server, create the file change_prompt.txt
in your project directory.
1
$ touch change_prompt.txt
Now you can run the server:
1
$ swift run
In a different shell, connect a client:
1
$ rdncat rderik.local 1234
When the server receives a message from the client, you should see our default %
prompt. In a different shell change the modification time again using touch
.
1
$ touch change_prompt.txt
Send another message from the client and see how the prompt has changed.
And that's it.
* You can check the full code in the GitHub Repository
Final thoughts
You've used kqueue
to track events from sockets, signals, and changes in the file system. By now, you can see how useful this mechanism is. Many higher-level APIs/Frameworks provide similar functionality, but probably many of them are based on kqueue
(e.g. GCD (Grand Central Dispatch)).
I encourage you to read about the other filters
. They can come in handy in some of your programs.
Alright, thanks for reading. I hope you find it useful, and as always, feedback is welcome.
Related topics/notes of interest
- Exit codes with special meaning
- If you are not familiar with Signals, this Wikipedia's article should give you a good idea.
- This was the paper that introduced the Kernel notification mechanisms - A Scalable and Explicit Event Delivery Mechanism for UNIX
- A good read comparing epoll(Linux implementation) to kqueues if the first link is not available, check the web archive version
- FreeBSD kqueue man
- Apple's WWDC20 session on Unsafe Swift - Explains when and how to use
Unsafe
types. - Apple's WWDC20 session on Safely manage pointers in Swift
-
The
filter
field of thekevent
struct helps us define which type of event we are going to be using. Check select(2)'s man page in the filter section for all the supported events. ↩