(:damion @world-state)

freelance software architect. artist. radio ham.


Sentities Overview: Twitter->CoreNLP & Async Clojure


categories:   clojure sentimentanalysis nlp socialmedia

Introduction

Sentities is a Clojure project that connects to Twitter’s public and free streaming API and pulls the random-sample tweet stream. CoreNLP is applied to obtain the named entities and a sentiment score via my damionjunk.nlp Clojure library.

There isn’t anything fancy going on here. The aim of this blog post, and the Sentities project is give a flavor for how one could go about doing these sorts of tasks with Clojure.

The Project

The following block diagram gives a high level view of what’s going on in this project.

There are three async channels, and three async consumers. The number of threads per consumer is variable, IO from streaming text may need to be handled right away, and we don’t want to have buffer over-runs while we wait to convert text to JSON and do NLP analysis.

We can think of this as a pipeline where text moves from Twitter and has some computation performed on it until the refined version is stored in some aggregated form:

(-> (get-tweet)
    (convert-to-json)
    (annotate-entities-and-sentiment)
    (aggregate-relevant-data))

Core.async makes it very easy to implement the above code sketch in a way that is both easy to manage and maximizes performance. See damionjunk.sentities.work for the specifics, but briefly:

(defonce t-text-in-chan (async/chan))
(defonce t-json-out-chan (async/chan))
(defonce t-annotated-out-chan (async/chan))

We create three channels. One for each of the output types that we want to process in parallel.

We then need to create three consumers. I’ll just show the code for one of them here, you can pull the project from GitHub to see the full example:

;; A function that creates some Aysnc threads:
(defn start-json-consumers
  "Starts N consumers who will eat JSON and annotate with the Stanford NLP."
  [n]
  (dotimes [_ n]
    (async/thread
      (while @running?
        (let [jdata (async/<!! t-json-out-chan)
              adata (annotate-tweet jdata)]
          ;; Annotate, and write to the annotated channel.
          (when adata (async/>!! t-annotated-out-chan adata)))))))


;; Later on, in an init function, we do something like:
;; create 4 json/nlp consumers of the JSON tweets
(start-json-consumers 4)

The important bits are that we have an atom called running?, which is just a boolean switch that we can flip to turn off the stream. The program will run forever otherwise! The parameter for (start-json-consumers) is an integer telling it how many threads you want. I just picked 4 for no reason, but in this case, there will be 4 threads ready to consume from the JSON channel t-json-out-chan and write to annotated information to the t-annotated-out-chan.

The real decision to be made is what to do with this pile of annotated tweets? We’re filling t-annotated-out-chan with NLP’d tweets, so we should probably do something with them. In Sentities I took an easy approach, since this code is just a proof of concept / example.

We have an annotated tweet consumer:

(defn start-annotated-consumers
  "Does something with the 'Annotated' tweets we've generated."
  [n]
  (dotimes [_ n]
    (async/thread
      (while @running?
        (let [adata     (async/<!! t-annotated-out-chan)
              smaps     (:sentiment-ner adata)
              entities  (mapcat entities-mfn smaps)]
          (doseq [se entities]
            ;; update the `sentities` atom/map
            (let [ek (:entity se)
                  se (dissoc se :entity)
                  se (assoc se :count 1)]
             ;; Update the `sentities` atom map by + merging the modified `se`
             ;; map which contains the sentiment and a count so we can track
             ;; entity frequency, and average sentiment surrounding the
             ;; entity.
             (swap! sentities update-in [ek] (partial merge-with +) se))))))))

Here we are pulling the :sentiment-ner data and generating a seq of {:sentiment :entity} maps. If an entity that is either an organization or person, we associate the sentiment from that sentence with the entity. (note: this is a bad assumption to make, but like I said, this project is just for #funzone #clojuretime)

The function below is mapped to the :sentiment-ner maps contained in the data consumed from the t-annotated-out-chan channel:

(defn entities-mfn
  "A mapping function that maps to sequences like:

   ({:sentiment 2,
     :text \"Google it.\",
     :tokens ({:pos \"NNP\", :ner \"ORGANIZATION\", :token \"Google\"}
              {:pos \"PRP\", :ner \"O\", :token \"it\"}
              {:pos \".\", :ner \"O\", :token \".\"})})

   Returns a sequence of maps like:

   ({:sentiment 2 :entity \"Google\"} ... )
   "
  [smap]
  (let [toks (:tokens smap)]
    (keep identity 
    	(map (fn [{ner :ner t :token}]
                 (when (or (= "ORGANIZATION" ner) (= "PERSON" ner))
                      {:sentiment (:sentiment smap) :entity t}))
              toks))))

Finally, we take this seq of sentiment/entities and aggregate an atom with count information so we can track the number of times the entity is mentioned and the average sentiment for that entity:

(swap! sentities update-in [ek] (partial merge-with +) se)

It’s in the code above, but the se map is a map that looks like:

{:count 1 :sentiment 3}

We’re using this data structure so we can do a convenient (update-in) and (partial merge-with +) call to update our atom in the (swap!) call.

The atom sentities now contains maps that look like:

{"Google" {:sentiment 40 :count 10}}

;; This example entry would represent an average sentiment of 40/10, or 4. (Very High)

Example “Output”

There isn’t actually any output generated in the program (aside from log/warnings). After letting the program run for a while, you can examine the atom that is used for data aggregation:

(let [results @swork/sentities
    sresults (into (sorted-map-by
                     (fn [k1 k2]
                       (compare [(:count (get results k1)) k1]
                                [(:count (get results k2)) k2])))
                   results)]
  ;; We want the top 5, sorted by :count
  (clojure.pprint/pprint (map (fn [[k {numer :sentiment denom :count}]]
                               {:entity k
                                :sentiment (* 1.0 (/ numer denom))
                                :count denom})
                            (take-last 5 sresults))))

This is what we see:

{:entity "Calum", :sentiment 1.411764705882353, :count 21}
{:entity "Duggar", :sentiment 1.117647058823529, :count 24}
{:entity "Justin", :sentiment 1.259259259259259, :count 27}
{:entity "Obama", :sentiment 1.444444444444444, :count 27}
{:entity "Google", :sentiment 1.25, :count 28}

Next Steps

Pull the code from the GitHub page: damionjunk.sentities. There is a comment block in damionjunk.sentities.sentities that has some REPL based code execution. You will need to get OAuth credentials from your Twitter account for this to work.

There is a (configure) call that expects to find an EDN file somewhere with the following filled out:

{:sentities {:oauth {:user-access-token        ""
                   :user-access-token-secret ""
                   :app-consumer-key         ""
                   :app-consumer-secret      ""
                   }}}

In my case, the full file path is:

/Users/djunk/.config/damionjunk/damionjunk.pub.edn

But you can put the file anywhere. This is just a personal convention.

If you would like to discuss this further, get in touch! I’d love to hear from you.