Skip to content

tonsky/net.async

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Network communications with clojure.core.async interface

This library is aimed at providing reliable, high-level, bidirectional message-oriented communication over TCP/IP.

  1. Send/receive messages, not data streams. It's what you need most of the time, anyway.
  2. Automatic reconnections when connection is lost.
  3. Stuck connection detection via heartbeats.
  4. Stable interface: chans stay valid to use no matter what socket/network conditions are. net.async will handle underlying resource/socket management for you.
  5. No exceptions: disconnects are normal state of network connection and are exposed explicitly at top-level interface.

Here’s how it works:

Note:

net.async is not a generic TCP library. You can’t for example implement SMTP with it. Instead, net.async is a message channel that uses TCP as transport. It abstracts details of TCP for you.

Protocol

Messages are encoded as this:

+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 4-byte message length | Message bytes |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Heartbeats are just messages with legnth == 0. Message lenght’s byte order is big endian.

Lein dependency

[net.async/async "0.1.1"]

Build Status

Types

<event-loop>    is { : running? <atom [true/false]> }
<client-socket> is { :read-chan  <chan>
                     :write-chan <chan> }
<accept-socket> is { :accept-chan <chan> }
<payload>       is byte[]

Preparation

(require '[clojure.core.async :refer [<!! >!! close!]])
(use 'net.async.tcp)

(event-loop) → <event-loop>

Client

(connect <event-loop> {:host "127.0.0.1" :port 9988}) → <client-socket>
(<!! read-chan) → :connected | :disconnected | :closed | <payload>
(>!! write-chan <payload>)
(close! write-chan)

Client will automatically reconnect if it loses its connection to the server. During that period, you'll get :disconnected messages from read-chan. Once connection is established, you'll get :connected message and then normal communication will resume.

As a side-effect of this, you can start client before you start server. Once server becomes live, client will establish the connection and start sending/receiving messages, all of this without any effort from your side.

Server

(accept <event-loop> {:port 9988}) → <accept-socket>
(<!! accept-chan)                  → <client-socket>
(close! accept-chan)

Shutting down

(shutdown! <event-loop>)

Configuring channels and heartbeats

(connect <event-loop> <addr>
  :read-chan        (chan (dropping-buffer 10))
  :write-chan       (chan (sliding-buffer 7))
  :reconnect-period  5000
  :heartbeat-period  8000
  :heartbeat-timeout 24000)

(accept <event-loop> <addr>
  :accept-chan      (chan 2)
  :read-chan-fn    #(chan (sliding-buffer 7))
  :write-chan-fn   #(chan (dropping-buffer 10))
  :heartbeat-period  8000
  :heartbeat-timeout 24000)

Note thatn you'd better configure heartbeats the same way at both connecting and accepting sides.

Sample echo server/client

(defn echo-server []
  (let [acceptor (accept (event-loop) {:port 8899})]
    (loop []
      (when-let [server (<!! (:accept-chan acceptor))]
        (go
          (loop []
            (when-let [msg (<! (:read-chan server))]
              (when-not (keyword? msg)
                (>! (:write-chan server) (.getBytes (str "ECHO/" (String. msg)))))
              (recur))))
        (recur)))))

(defn echo-client []
  (let [client (connect (event-loop) {:host "127.0.0.1" :port 8899})]
    (loop []
      (go (>! (:write-chan client) (.getBytes (str (rand-int 100000)))))
      (loop []
        (let [read (<!! (:read-chan client))]
          (when (and (keyword? read)
                     (not= :connected read))
            (recur))))
      (Thread/sleep (rand-int 3000))
      (recur))))

About

Network commucations with clojure.core.async interface

Resources

License

Stars

Watchers

Forks

Packages

No packages published