shoreleave/shoreleave-pubsub

0.3.0


A smarter client-side with ClojureScript : Shoreleave's publish/subscribe system

dependencies

org.clojure/clojure
1.4.0
shoreleave/shoreleave-core
0.3.0
shoreleave/shoreleave-browser
0.3.0

dev dependencies

lein-marginalia
0.7.1



(this space intentionally left almost blank)
 
(ns shoreleave.pubsubs.crossdoc
  (:require [clojure.browser.net :as net]
            [goog.net.xpc.CrossPageChannel :as xpc]
            [shoreleave.pubsubs.protocols :as ps-protocols]))

LOOK AWAY

This is a work in progress and should not be used AT ALL

(def reg-once (atom #{}))

This is a pubsub system built upon HTML5 Messaging API It will allow for cross-document pubsub functionality

Paul, You should use raw XPC, PortChannel (if you only want to support HTML5 postMessage), and look into using clojure.browser.net for the XPC interface

(extend-type goog.net.xpc.CrossPageChannel
  ps-protocols/IMessageBrokerBus
  (subscribe [bus handler-fn topic]
    (net/register-service bus (ps-protocols/topicify topic) handler-fn))

  (subscribe-once [bus handler-fn topic]
    (.subscribeOnce bus (ps-protocols/topicify topic) handler-fn))

  (unsubscribe [bus handler-fn topic]
    (net/register-service bus (ps-protocols/topicify topic) #(identity nil)))

  (publish
    ([bus topic data]
     (.publish bus (ps-protocols/topicify topic) data))
    ([bus topic data & more-data]
     (.publish bus (ps-protocols/topicify topic) (into [data] more-data)))))
(defn subscribers-count [bus topic]
  (.getCount bus (ps-protocols/topicify topic)))
(defn bus []
  (net/xpc-connection))
 

An extended pub/sub implementation built on Google's EventTarget and Event system

(ns shoreleave.pubsubs.event
  (:require [goog.events :as gevents]
            [goog.events.EventTarget :as gevent-target]
            [clojure.browser.event :as event]
            [shoreleave.pubsubs.protocols :as ps-protocols]))

Below is an implementation of IMessageBrokerBus built upon Google Closure's EventTarget object

This effectively makes ALL EventTargets capable buses

This is the internal Event used for publishing. It ships data along with it

(defn publish-event [topic data]
  (let [e (goog.events.Event. topic)]
    (set! (.-data e) data)
    e))

Because we need to unpack events for the handler functions, We need a mapping of handler-fn -> #(handler-fn (.-data %))

(def handlers (atom {}))
(extend-type goog.events.EventTarget
  ps-protocols/IMessageBrokerBus
  (subscribe [bus topic handler-fn]
    (let [wrapped-handler-fn #(handler-fn (.-data %))]
      (swap! handlers assoc handler-fn wrapped-handler-fn)
      (event/listen bus (ps-protocols/topicify topic) wrapped-handler-fn)))

  (subscribe-once [bus topic handler-fn]
    (let [wrapped-handler-fn #(handler-fn (.-data %))]
      (swap! handlers assoc handler-fn wrapped-handler-fn)
      (event/listen-once bus (ps-protocols/topicify topic) wrapped-handler-fn)))

  (subscribe->
    ([bus handler-fn1 handler-fn2 handler-fn3]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3))
    ([bus handler-fn1 handler-fn2 handler-fn3 handler-fn4]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3 handler-fn4))
    ([bus handler-fn1 handler-fn2 handler-fn3 handler-fn4 handler-fn5]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3 handler-fn4 handler-fn5)))

  (unsubscribe [bus topic handler-fn]
    (event/unlisten bus (ps-protocols/topicify topic) (@handlers handler-fn)))

  (publish
    ([bus topic data]
     (event/dispatch-event bus (publish-event
                                 (ps-protocols/topicify topic)
                                 data)))
    ([bus topic data & more-data]
     (event/dispatch-event bus (publish-event
                                 (ps-protocols/topicify topic)
                                 (into [data] more-data)))))

  IHash
  (-hash [bus] (goog.getUid bus)))

Get an event bus

(defn bus
  []
  (goog.events.EventTarget.))
 
(ns shoreleave.pubsubs.protocols)

Publish/Subscribe

Shoreleave allows you to compose your programs by declaratively binding functions, atoms, workers, and and localStorage through a publish/subscribe bus.

The system is built upon two protocols: IMessageBrokerBus and IPublishable

IMessageBrokerBus

This protocol abstracts away the implementation details of the bus itself, enabling you to program aginst the interface instead of the implementation.

You can imagine there different types of buses you might use: local buses, cross-document buses, buses that are a proxy for other servers...

The protocol defines the following functions:

  • subscribe - for a given bus, bind the handler-fn to the topic.
  • subscribe-once - for a given bus, create a one-time binding between the topic and the handler-fn
  • subscribe-> - for a given bus, create a chain of subscriptions, similar to how the threading macro works
  • unsubscribe - for a given bus, unbind the handler-fn and the topic
  • publish - for a given bus, publish data to all the handler-fn's bound to the topic
(defprotocol IMessageBrokerBus
  (subscribe [broker-bus topic handler-fn])
  (subscribe-once [broker-bus topic handler-fn])
  ;(subscribe-> [broker-bus & chain-handler-fns])
  ;; Protocols don't support variable args, back to the old impl
  (subscribe-> [broker-bus handler-fn1 handler-fn2 handler-fn3]
               [broker-bus handler-fn1 handler-fn2 handler-fn3 handler-fn4]
               [broker-bus handler-fn1 handler-fn2 handler-fn3 handler-fn4 handler-fn5])
  (unsubscribe [broker-bus topic handler-fn])
  (publish
    [broker-bus topic data]
    [broker-bus topic data more-data]))
(defn chain-subscriptions [bus & handler-fns]
  (let [subscripts (partition 2 1 handler-fns)]
      (when-not (empty? subscripts)
        (doseq [[t h] subscripts]
          (subscribe bus t h)))))

In addition to strings and keywords, functions as topics, allowing for a pipeline-like binding to take place. For example:

(subscribe my-bus user-search render-search-results)

... where user-search is a function that returns some search results, and render-search-results takes search data and renders it to the DOM.

When user-search is called, the results are automatically rendered to the DOM. This frees the developer from having to liter DOM, view, and state logic throughout the application. Shoreleave's pubsub allows you to bind together pure functions to build out functionality.

Additionally, cross-cutting functionality (like logging) is more easily managed through the pubsub system.

IPublishable

The second protocol is used to define things that can be published, ie: things that be used as a topic.

Publishables are usually constructed as a decorator (which result in a Function object containing metadata), or as an adapter to IWatchable interfaces.

A Publishable knows how to make a string-based topic of itself (topicify), can tell if it has already been decorated (publishized?), and can generate the decorator form that is bound to a bus (publishize)

(defprotocol IPublishable
  (topicify [t])
  (publishized? [t])
  (publishize [t broker-bus]))
 
(ns shoreleave.pubsubs.publishable
  (:require [shoreleave.pubsubs.protocols :as ps-protocols]))

Publishables

Shoreleave comes with out-of-the-box support for the most common publishables

Functions and Function types

Functions need to be decorated (much like how memoize works). This is now supported by CLJS core - ported from Shoreleave.

For example:

 (def a-bus (simple-bus/bus))
 (defn some-fn [] 5)
 (def some-fn-p (publishize some-fn))

 (defn another-fn [x] (inc x))
 (def another-fn-p (publishize another-fn))

 (subscribe a-bus another-fn-p some-fn-p) ;; some-fn-p will automatically publish its result to another-fn-p
 ;; Note that the subscribing function (or entity) doesn't need to be `publishize`

 (some-fn) ;; This DOES NOT get sent to the bus
 (some-fn-p) ;; The results of this call are published on the bus

Anything that is subscribed to some-fn-p will get the value 5 when the function is called, as shown above.

Atoms

Atoms can also be topics. This is no different than watching the atom.

All subscribed functions will be passed a map: {:old some-val :new another-val}

Browser storages (localStorage and sessionStorage)

All storage systems behave exactly like an atom, as described above

WorkerFn

Embedded workers behave exactly like atoms, as described above

The default case

You can also use strings and keywords as topics (the most useful case for cross-cutting functionality).

(extend-protocol ps-protocols/IPublishable

  function
  (topicify [t]
    (or (ps-protocols/publishized? t)
        (-> t hash str)))
  (publishized? [t]
    (:sl-published (meta t)))
  (publishize [fn-as-topic bus]
    (if (-> (meta fn-as-topic) :sl-buses (get (-> bus hash keyword)))
      fn-as-topic
      (let [published-topic (ps-protocols/topicify fn-as-topic)
            new-meta (assoc (meta fn-as-topic) :sl-published published-topic
                                               :sl-buses (-> (get (meta fn-as-topic) :sl-buses #{}) (conj (-> bus hash keyword))))]
        (with-meta (fn [& args] (let [ret (apply fn-as-topic args)]
                       (ps-protocols/publish bus published-topic ret)
                       ret))
                   new-meta))))

  Atom
  (topicify [t]
    (or (ps-protocols/publishized? t)
        (-> t hash str)))
  (publishized? [t]
    (-> t hash str))
  (publishize [atom-as-topic bus]
    (let [published-topic (ps-protocols/topicify atom-as-topic)
          bus-key (-> bus hash keyword)]
      (do
        (add-watch atom-as-topic bus-key #(ps-protocols/publish bus published-topic {:old %3 :new %4}))
        atom-as-topic)))

  ;; this could be a Fn that we attached metadata to - for some reason it gets hit like an obj, instead of a fn 
  object
  (topicify [t]
    (or (ps-protocols/publishized? t)
        (-> t hash str)))
  (publishized? [t]
    (:sl-published (meta t)))
  
  string
  (topicify [t] t)

  default
  (topicify [t]
    (name t)))

Local Storage

It is expected that before calling this, you've handled your depenencies, ala (:require [goog.storage.mechanism.HTML5LocalStorage :as gls])

(defn include-localstorage! []
  (extend-type goog.storage.mechanism.HTML5LocalStorage
    ps-protocols/IPublishable
    (topicify [t]
      (or (ps-protocols/publishized? t)
          (-> t hash str)))
    (publishized? [t]
      (-> t hash str))
    (publishize [ls-as-topic bus]
      (let [published-topic (ps-protocols/topicify ls-as-topic)
            bus-key (-> bus hash keyword)]
        (do
          (add-watch ls-as-topic bus-key #(ps-protocols/publish bus published-topic {:old %3 :new %4}))
          ls-as-topic)))))

Session Storage

It is expected that before calling this, you've handled your depenencies, ala (:require [goog.storage.mechanism.HTML5SessionStorage :as glss])

(defn include-sessionstorage! []
  (extend-type goog.storage.mechanism.HTML5SessionStorage
    ps-protocols/IPublishable
    (topicify [t]
      (or (ps-protocols/publishized? t)
          (-> t hash str)))
    (publishized? [t]
      (-> t hash str))
    (publishize [ss-as-topic bus]
      (let [published-topic (ps-protocols/topicify ss-as-topic)
            bus-key (-> bus hash keyword)]
        (do
          (add-watch ss-as-topic bus-key #(ps-protocols/publish bus published-topic {:old %3 :new %4}))
          ss-as-topic)))))

This is left in the code for historical reasons, You can use it as an example on how to build custom function decorators in ClojureScript, correctly use the Blob system, generate new Object URLs, and how WebWorkers execute.

 

An extended pub/sub implementation built on Google's PubSub Object

(ns shoreleave.pubsubs.simple
  (:require [goog.pubsub.PubSub :as pubsub]
            [shoreleave.pubsubs.protocols :as ps-protocols]))

Below is an implementation of IMessageBrokerBus built upon Google Closure's PubSub object This will have all the properties of PubSub (ie: synchronous)

(extend-type goog.pubsub.PubSub
  ps-protocols/IMessageBrokerBus
  (subscribe [bus topic handler-fn]
    (.subscribe bus (ps-protocols/topicify topic) handler-fn))

  (subscribe-once [bus topic handler-fn]
    (.subscribeOnce bus (ps-protocols/topicify topic) handler-fn))

  (subscribe->
    ([bus handler-fn1 handler-fn2 handler-fn3]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3))
    ([bus handler-fn1 handler-fn2 handler-fn3 handler-fn4]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3 handler-fn4))
    ([bus handler-fn1 handler-fn2 handler-fn3 handler-fn4 handler-fn5]
     (ps-protocols/chain-subscriptions bus handler-fn1 handler-fn2 handler-fn3 handler-fn4 handler-fn5)))

  (unsubscribe [bus topic handler-fn]
    (.unsubscribe bus (ps-protocols/topicify topic) handler-fn))

  (publish
    ([bus topic data]
     (.publish bus (ps-protocols/topicify topic) data))
    ([bus topic data & more-data]
     (.publish bus (ps-protocols/topicify topic) (into [data] more-data))))

  IHash
  (-hash [bus] (goog.getUid bus)))

Given a bus and a topic, return the number of subscribers (registered handler functions)

(defn subscribers-count
  [bus topic]
  (.getCount bus (ps-protocols/topicify topic)))

Get a simple bus

(defn bus
  []
  (goog.pubsub.PubSub.))