April 27, 2013

Letters from the Future: Challenging Google’s Search Engine

A previous version of this article was posted on Duck Duck Go reddit, where the user _zekiel pointed out that DDG currently uses the two-level search we had proposed.

Google is the undisputed search leader (88% market share in the US1). Google is not only ahead of competitors in terms of quality of search results, infrastructure worthy of science fiction, and computer science research. Another of their strengths is how quickly they apply their own research.

How can Google be dethroned? Sure, there are other search engines, and newcomers like Blekko and Duck Duck Go make headlines from time to time. However, when you look more closely at those other search engines, you find that they cannot seriously compete with Google.

Benchmarking Search Engines

A search for “reverse engineering” on Blekko returns a hundred thousand results, while the same search on Google returns approximately two million. If it is so difficult for Blekko to compete at the crawling level, imagine what happens in the rest of the search engine pipeline. Just looking at Google’s search quality reports  tells you that Page Rank was only the catalyst for much more sophisticated algorithms.

Duck Duck Go manages to attract a geeky audience with highlighted features like putting privacy first. If we search for “reverse engineering” on Duck Duck Go the results seem wacky: the second result is http://reverseengineeringinc.com/ a content poor site which just has the right domain name.

Google appears to be in a league by itself. It currently seems unlikely that they could lose significant market share due to an engineering weakness. In order to outdo Google, we must think holistically and try to guess how the web as a whole will evolve over the next ten years.

A Holistic Approach

Duck Duck Go created a two level search engine for sites like Wikipedia or YouTube. DDG offers DuckDuckGo Instant Answer API to incorporate the search engines of third parties. In order to take advantage of DDG and other two-tiered search engines, sites will have to improve their local search. Currently if you search using the local site search inside Stack Overflow, for example, the results are much lower quality than the same query in Google restricted to stackoverflow.com. When each site understands its own data better than Google, its internal search results will surpass Google’s. Google will no doubt continue to provide better global results, but the two-tiered search would decentralize efforts to improve algorithms. It is important to note that this solution does not need to be distributed: sites can share their local indexes and ranking algorithms with the routing search engine.

The fact that a small number of sites receive the majority of Internet traffic means that optimizing the top sites for a two layer search would make a big difference.

Notes

  1. Search Engine Market Share

Additional Resources

  1. Can a small search engine take on Google?
  2. Google’s Weakness, AltaVista’s Strength [2002]
  3. Is the Internet driving competition or market monopolization?
  4. Media and Internet concentration in Canada

See Also

  1. Reverse Engineering and The Cloud
  2. Egont, A Web Orchestration Language
  3. Helping Search Engines to Find Content in the Invisible Web
  4. The Data Portability Fact Sheet
February 28, 2013

Parsing S-Expressions in C# using OMeta

It is easy to parse S-Expressions in C# with OMeta. Our code limits the grammar to lists, and atoms of string, symbol, and number types. So, it is not complete, but it can easily be expanded with OMeta. What motivated me to write this article was the lack of publicly available S-Expression parsers in C#/.NET.

Our parser converts the expression (+ (* 3 4 5 6) (- 7 1) ) to the following tree:

parsed-s-expression
where each vertex is represented by a C# class containing an ArrayList, Symbol, String, or Integer. Note that the expression (1) is different from the expression without parenthesis. The first is a list with one atom and the other is just the atom.

S-Expressions are a compact way to express programs and data structures. They were first defined for Lisp, but are used in a variety of areas including public key infrastructure. We use S-Expressions to define data flows in Egont, our web orchestration language. In Egont, each S-Expression produces a tree which is converted into a directed acyclic graph, the subject of a future post.

OMeta can be used under C# via the OMeta# project. That makes it more interesting since the classical lexical analyzer and parser generators such as Lex/flex and Yacc/GNU bison do not produce C# code. ANTLR is an interesting alternative but at the time of this post the latest version, ANTLR 4, does not support C#. OMeta’s ability to deal with ambiguities makes it more suited to playing with grammars. However, there are performance penalties in OMeta which must be taken into account.

Code

The code is available as SExpression.NET [github.com].

  1. Compile the RebuildParser project first
  2. Run the Test project
  3. The SExpression project contains the SExpression.ometacs parser and its related C# classes

See Also

  1. Egont, a [Social] Web Orchestration Language
  2. Egont Part II

Additional Resources

  1. IronMeta: another OMeta implementation in C#
  2. YaYAML: a YAML parser written in OMeta#
  3. OMeta Performance
  4. Domain-Specific Languages: An Annotated Bibliography

January 21, 2013

Searching for Substrings in Streams: a Slight Modification of the Knuth-Morris-Pratt Algorithm in Haxe

It is odd that the base libraries for most programming languages do not allow you to search for regular expressions and substrings in streams or partial reads. We have modified the KMP algorithm so that it accepts virtually infinite partial strings. The code is implemented in Haxe, so it can generate code in multiple programming languages.

Streams are important when working with data that does not fit in main memory, such as large files, or with data which is being transferred. There are a few implementations of regular expressions and substrings matching. One is the Jakarta Regexp, now retired and resting in the Apache Attic. The Jakarta Regexp library “match” method in the RE class uses a CharacterIterator as a parameter. In C++, Boost.Regex implements partial matches.

Our code is implemented in Haxe so the same code can target Javascript, ActionScript, Flash SWF, NekoVM, PHP, C++, C#, and Java. We really like the concept of writing one code and expanding it to a variety of platforms with minimum effort. There are excellent libraries in specific environments that can work perfectly in other environments. Porting libraries from one programming language to another is tedious. For example, the amazing NetworkX graph library implemented in Python can be easily ported to C# to benefit a broader audience.

Code

Prerequisites

  1. Haxe (tested on version 2.10)
  2. For C++: hxcpp (run haxelib install hxcpp)
  3. For Java: hxjava (run haxelib install hxjava)
  4. For Mono/C#: jxcs (run haxelib install hxcs)

Source code available on github.

See Also

  1. Parsing S-Expressions in C# using OMeta
  2. Esoteric Queue Scheduling Disciplines

Resources

  1. Knuth-Morris-Pratt string matching
  2. Text Searching: Theory and Practice
  3. Boyer–Moore–Horspool algorithm
  4. Rabin–Karp algorithm
  5. Aho–Corasick string matching algorithm
  6. Lexicographically minimal string rotation
  7. Efficient way to search a stream for a string
August 16, 2012

Enriching a List of URLs with Google Page Rank

Dealing with a large body of web resources can be daunting. You make a list of hundreds of blogs, but how do you share or recall those resources later? You must somehow organize your list. Many people do this with tags, but this is not necessarily the best option. Manual organization is also tedious, so tools for enriching data automatically came in handy. The relevance of different resources changes over time. What we originally tagged as “breakthrough” may come insignificant.

Last week I saw a friend who had recently started a new job and wanted my opinion about current and future technological trends. I wanted to give him links to thousands of resources that I have been accumulating over the years, but organized in such a way that he would not have to view them one at a time. This triggered an avalanche of ideas about how to enrich lists of links. My first thought was to rank my list of sites about venture capital and data science using Google Page Rank. I also considered adding the number of tweets, likes, and “+1” for each site but these are generally awarded for individual articles, not whole sites. I ended up adding the Google Page Rank with project pagerank.

The most interesting ideas to explore, though, are in another direction: how to boost items that are in the long tail. The best music may not make the Top 40, and so remains invisible. Algorithms better at recognizing value in the long tail would revolutionise the economy.

The code is available on github. Two examples of the output are available on data-science-bundle and venture-capital-bundle.

See Also

  1. Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website
  2. Exporting StackOverflow users blogs to Excel
  3. Data Science Resources
March 29, 2012

Esoteric Queue Scheduling Disciplines

New Challenges Requires New Tools

Big Data challenges current message oriented middleware (MOM) applications. MOM usually works with FIFO and priority scheduling disciplines. What happens if there is a large list of URLs ready to be crawled but you want to give URLs at the end of the list a chance of being crawled earlier? This concept comes from genetics, and is used in genetic algorithms selection schemes. The last URLs may contain interesting new resources in spite of their order or priority. Consuming from a FIFO queue takes a long time to crawl these URLs. Priority scheduling is more helpful, but it is not possible to know apriori how useful a URL will end up being in the quest for new Internet resources. Why not add a chance factor to URL selection by using roulette wheel scheduling and an efficient algorithm?

Data flows follow an order of execution based on task dependencies. One task cannot start until the preceding tasks have finished. This is the way a spreadsheet works. A change in a cell triggers a series of processes to be completed in topological order. Why not add task dependency to MOM applications? An item can be consumed from the queue only if its precedent tasks have been completed. We provide some data flow resources at the end of the Egont Part II article. However, a new queue scheduling discipline could be used in place of a separate framework. Ideally, the new queue discipline would include features such as persistence and transactions.

Roulette Wheel Scheduling Algorithm Design

To the best of our knowledge, there are currently no Internet resources about using a roulette wheel scheduling discipline for a queue.

The external interface for a roulette wheel queue is the same as for a typical queue with “get” and ”put” methods except that the “put” method incorporates a new probabilistic parameter. Probabilities can be expressed as integers. When a consumer requests an item, a random number is generated to decide which item is selected. Items with higher probabilities have a greater chance of being retrieved, but even items with low probabilities can be consumed.

The implementation of an efficient roulette wheel queue is not easy. Genetic algorithms use roulette wheel selection to choose between a small set of alternatives. A queue used for crawling can contain a lot of URLs, and the question is how to take these processes into account in order to find, add, and remove URLS efficiently.

Finding an item in a roulette wheel data structure is O(n) for trivial traversing and O(log(n)) using a binary search. Adding an item is trivial, it can be added at the end and the new total is the sum of the previous probability parameters total plus the new item’s probability parameter. Removing an item is more difficult. The trivial, but not the best, solution is to recalculate all the partial sums after the element which is being removed. A better solution is to use a heap tree data structure or one of its variants.

An alternative that merits further study is the use of Fenwick trees. In 1994, Peter M. Fenwick discovered how to improve the finding, adding, and modifying of items and the calculation of their subtotals. Since the Fenwick tree works over a fixed range of items, item keys must be preallocated.

See Also

  1. Using Queues in Web Crawling and Analysis Infrastructure
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue
  4. Ideas: Egont, A Web Orchestration Language

Resources

  1. A New Data Structure for Cumulative Frequency Tables
  2. Select random k elements from a list whose elements have weights and the roulette wheel answer
  3. A comparative analysis of selection schemes used in genetic algorithms
  4. A Framework for Alternate Queueing: Towards Traffic Management by PC-UNIX Based Routers
  5. Stack and Queue Layouts of Directed Acyclic Graphs
  6. Dynamic Data Structures for Taskgraph Scheduling Policies with Applications in OpenCL Accelerators

Photo taken by Kristofer Björkman

March 27, 2012

Adding Acknowledgement Semantics to a Persistent Queue

Persistence capability is not enough to ensure the reliability of message oriented middleware. Suppose that you retrieve an item from a queue, and the application or thread crashes in the middle of the process. The item and processes depending on it will be lost, since the crash occurred after retrieving the item from the queue. Acknowledgement semantics can prevent this loss If the application crashes before acknowledging an item. This item will continue to be available to other consumers until an acknowledgment is sent.

This Python code shows how to add acknowledgement to a class derived from the Python Queue class. In the article Persisting Native Python Queues we only show how to persist a queue. It is important to note that we have modified the base Python Queue class, adding the “connect” and “ack” methods. Each application thread must call the “connect” method before using the queue object. The “connect” method returns a unique queue proxy. If the thread crashes, the items that have been fetched, but not acknowledged, in this queue are enqueued again. The “ack” method uses the item returned by the “get” method and effectively removes the item from the queue. In this code ZODB is used for persistence instead of DyBASE. If the entire application crashes, not just a single thread, unacknowledged items are requeued when it restarts.

While acknowledgement semantics increases reliability, it is not infallible. Imagine that after processing an acknowledged item, the result of the process is also added to the queue. In some web crawling implementations, first a URL is retrieved from a queue and acknowledged, then an HTML page is fetched from that URL, and finally the links on that page are inserted in the queue. Two problems can occur if the application or thread crashes during this process. If items, in this case URLs, are acknowledged and thus eliminated as soon as they are retrieved, they may be eliminated before enqueuing all of the links on the page. In this case, the remaining links will be lost. If, on the other hand, items are acknowledged only after enqueuing all the links, some links will be duplicated. This conflict is solved with queue transaction semantics. If the process or thread crashes a rollback is performed.

Notes

  1. This persistent queue with acknowledgement assumes that the objects in the queue all have different identities, id(obji) != id(objj) for all i,j. Making a copy of the object works for mutable objects. Immutable objects must be wrapped.
  2. The object classes in the queue must inherit from the Persistent class, including object members.

Prerequisites

  1. Python 2.x (x>=6)
  2. ZODB3

Code

The code is available at github and includes a series of unit tests.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Persisting Native Python Queues

Resources

  1. AMQP Acknowledgement
  2. HornetQ Asynchronous Send Acknowledgements
  3. HornetQ Transactions
  4. ZODB In Real Life
  5. Storing Persistent Objects with Persistent Objects as attributes of the Parent PO

Photo taken by Paul Downey

February 23, 2012

Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

#!/usr/bin/python

from Queue import Queue
import dybase
import sys

MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1

#DEBUG = True
DEBUG = False

class Root(dybase.Persistent):
	def __init__(self):
		self.start = 0
		self.stop = 0

class SizeOfPersistentQueueExceeded(Exception):
	pass

class incomplete_persistent_deque:
	def __init__(self, filename):
		self._init_db(filename)

	def _init_db(self, filename):
		self.db = dybase.Storage()
		if self.db.open(filename):
			self.root = self.db.getRootObject()
			if self.root == None:
				self.root = Root()
				self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
				self.root.pending_elements = self.db.createIntIndex()

				self.db.setRootObject(self.root)
				self.db.commit()
			else:
				if DEBUG:
					print "self.root already exists"

		if DEBUG:
			print "self.root.start =", self.root.start
			print "self.root.stop = ", self.root.stop

	def __len__(self):
		if self.root.stop >= self.root.start:
			return self.root.stop - self.root.start
		else:
			return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)

	def append(self, item):
		# add element to index
		self.root.elements.insert(self.root.stop, item)
		self.root.stop += 1
		if self.root.stop > MAX_INT:
			# check also if stop touches start
			self.root.stop = MIN_INT

		if self.root.start == self.root.stop:
			raise SizeOfPersistentQueueExceeded

		# persist
		self.root.store()
		self.db.commit()

	def popleft(self):
		# don't check if empty, Queue class take charge of that
		# remove element from index
		item = self.root.elements.get(self.root.start)
		self.root.elements.remove(self.root.start)
		self.root.start += 1
		if self.root.start > MAX_INT:
			# check also if start touches stop
			self.root.start = MIN_INT 

		if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
			self.root.start = 0
			self.root.stop = 0

		# persist
		self.root.store()
		self.db.commit()

		return item

class PersistentQueue(Queue):
	def __init__(self, filename, maxsize = 0):
		self.filename = filename
		Queue.__init__(self, maxsize)

	def _init(self, maxsize):
		# original: self.queue = deque()

		# incomplete_persistent_deque:
		# - incomplete implementation but enough for Queue:
		# - implemented methods:
		# -- __len__
		# -- append
		# -- popleft
		#

		self.queue = incomplete_persistent_deque(self.filename)

	def connect(self): # to handle failovers
		pass

	def ack(self):
		pass

	#def ack(self, item):

class ElementTest:
	def __init__(self, value):
		self.value = value

	def __repr__(self):
		return self.value

	def __str__(self):
		return self.value

def test1():
	q = PersistentQueue("myqueue.dbs")
	if not q.empty(): # get pending items
		while not q.empty():
			e = q.get()
			print e

	for s in ['one', 'two', 'three']:
		q.put(ElementTest(s))

def main(): # run this script twice to see the persisted elements
	test1()

if __name__ == '__main__':
	main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template
February 17, 2012

Using Queues in Web Crawling and Analysis Infrastructure

Message oriented middleware (MOM) is a key technology for implementing a custom pipeline and analyzing unstructured data. The pipeline for going from crawling web pages to part of speech tagging (PoST) and beyond is long. It requires a variety of processes which are implemented in several different programming languages and operating systems. For example, boilerpipe is an excellent Java library for extracting main text content while PoSTs libraries, like NLTK or FreeLing, are implemented in Python.

One might be tempted to integrate different technologies using web services but web services alone have many weak points. If the pipeline has ten processes and, for example, the last one fails, then the intermediate processes can be lost if they are not persisted. There must be a higher level mechanism in place to resume the pipeline processing. MOMs ensure message persistence until a consumer acknowledges that a specific process has finished.

There are a lot of MOMs to choose from, including commercial and free open source variants. Some features are present in almost all of them while others are not. Contention management is an important feature if you are dealing, as is likely, with a high ratio of messages produced to messages consumed at any one time. For example, a web crawler can fetch web pages at an incredibly high speed while processes like content extraction take longer. Running a message queue without contention management under these circumstances will exhaust the machine’s memory.

While MOMs are important for uniting heterogeneous technologies, the different processes must also know which queues to utilize to consume the input and produce the output for the next phases. A new wave of frameworks like NServiceBusResque, Celery, and Octobot has emerged to handle this.

In conclusion, MOMs help to connect heterogeneous technologies and bring robustness, and are very useful in the context of unstructured information like text analysis. Many MOMs are available, but there is not a single one with a complete feature set. However some of these features can be supplied by frameworks such as NServiceBus, Resque, Celery, and Octobot.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. Message Queues vs Web Services
  2. Message Queue Evaluation Notes
  3. The Hadoop Map-Reduce Capacity Scheduler
  4. Contention Management in the WSA
  5. Message Queuing Architectures
February 2, 2012

Integrating Google Analytics into your Company Loop with a Microsoft Excel Add-on

 

Introduction

Google Analytics and AdWords are essential marketing and sales tools. They can be integrated with the ubiquitous Microsoft Excel with the Google Data API. Data Big Bang’s Nicolas Papagna has developed an Excel add-on which can be downloaded here. This plugin enables Excel users to quickly retrieve Google Analytics data using the available Google Analytics metrics, and dimensions, and may also be sorted by the user’s criteria. One of the advantages of our solution is that Excel accesses the Google Analytics API directly instead of accessing it thru Data Big Bang server. Other solutions need access to your information which this exposes your private data to third parties.

Installation and Usage

  1. Download GoogleAnalyticsToExcel.AddInSetup_1.0.20.0.exe.
  2. Install it.
  3. Run Microsoft Excel.
  4. Configure your Google credentials by clicking on “Settings” under the “Google Analytics to Excel Addin” ribbon tab.
  5. Customize your query and retrieve your Google Analytics data by clicking “Query Google Analytics” button.

Development Notes

Data Big Bang’s research team has also developed an OData web service that can be consumed using applications such as PowerPivot, Tableau and LINQPad. This web service doesn’t require any add-ons. However, since unfortunately neither PowerPivot nor Tableau offer query builders to interact with OData providers, users must know how to craft the OData URL query themselves. The most interesting part of this project was developing a Google Data Protocol to Open Data Protocol .NET class that offers an IQueryable interface to convert LINQ queries to GData. LINQ queries add a lot of expressive power beyond GData.

See Also

  1. Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website
  2. Integrating Dropbox with Microsoft Outlook
  3. Exporting StackOverflow users blogs to Excel Hyperlinks
January 4, 2012

Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website

Searching for Sales Leads

The best definition of “marketing” I have read is by Dave Kellog in the What the CEO Really Thinks of Marketing (And 5 Things You Can Do About It) presentation. He says that marketing exists to make sales easier. For example, the process of searching for sales opportunities can be optimized if we pay attention to what our prospectives and current customers are sharing on different social media. Good corporate blogs include insightful information about the company’s aims. The first step in this direction is to discover what web resources a specific company has available. The discovery process is easier for companies than for individuals. Individuals uses a variety of aliases and alternative identities on the web. while companies with good communication strategies provide links to all of their web resources on their primary sites.

Discovery

We offer a script which retrieves web resources connected to any company’s URL. With this tool you will no longer waste time manually searching for this useful information. Companies and people usually have a number of associated sites: blogs; LinkedIn accounts; Twitter accounts; Facebook pages; and videos and photos on specialized sites such as YouTube, Vimeo, Flickr, or Picassa. A recursive level of page crawling is needed to retrieve the location of associated resources. Large companies such as IBM or Dell have multiple accounts associated with different areas. IBM has different Twitter accounts for their research divisions and for the important corporate news.

Usage

fwc.py <input.yaml> <output.yaml>

Look at data-science-organizations.yaml for an example.

Prerequisites

  1. Python 2.7 (or greater 2.x series)
  2. lxml.html
  3. parse_domain.py
  4. PyYAML

Script

This code is available at github.

fwc.py

#!/usr/bin/python2.7

import argparse
import sys
from focused_web_crawler import FocusedWebCrawler
import logging
import code
import yaml
from constraint import Constraint

def main():
   logger = logging.getLogger('data_big_bang.focused_web_crawler')
   ap = argparse.ArgumentParser(description='Discover web resources associated with a site.')
   ap.add_argument('input', metavar='input.yaml', type=str, nargs=1, help ='YAML file indicating the sites to crawl.')
   ap.add_argument('output', metavar='output.yaml', type=str, nargs=1, help ='YAML file with the web resources discovered.')

   args = ap.parse_args()

   input = yaml.load(open(args.input[0], "rt"))

   fwc = FocusedWebCrawler()

   for e in input:
      e.update({'constraint': Constraint()})
      fwc.queue.put(e)

   fwc.start()
   fwc.join()

   with open(args.output[0], "wt") as s:
      yaml.dump(fwc.collection, s, default_flow_style = False)

if __name__ == '__main__':
   main()

focused-web-crawler.py

from threading import Thread, Lock
from worker import Worker
from Queue import Queue
import logging

class FocusedWebCrawler(Thread):
   NWORKERS = 10
   def __init__(self, nworkers = NWORKERS):
      Thread.__init__(self)
      self.nworkers = nworkers
      #self.queue = DualQueue()
      self.queue = Queue()
      self.visited_urls = set()
      self.mutex = Lock()
      self.workers = []
      self.logger = logging.getLogger('data_big_bang.focused_web_crawler')
      sh = logging.StreamHandler()
      formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
      sh.setFormatter(formatter)
      self.logger.addHandler(sh)
      self.logger.setLevel(logging.INFO)
      self.collection = {}
      self.collection_mutex = Lock()

   def run(self):
      self.logger.info('Focused Web Crawler launched')
      self.logger.info('Starting workers')
      for i in xrange(self.nworkers):
         worker = Worker(self.queue, self.visited_urls, self.mutex, self.collection, self.collection_mutex)
         self.workers.append(worker)
         worker.start()

      self.queue.join() # Wait until all items are consumed

      for i in xrange(self.nworkers): # send a 'None signal' to finish workers
         self.queue.put(None)

      self.queue.join() # Wait until all workers are notified

#     for worker in self.workers:
#        worker.join()

      self.logger.info('Finished workers')
      self.logger.info('Focused Web Crawler finished')

worker.py

from threading import Thread
from fetcher import fetch
from evaluator import get_all_links, get_all_feeds
from collector import collect
from urllib2 import HTTPError
import logging

class Worker(Thread):
   def __init__(self, queue, visited_urls, mutex, collection, collection_mutex):
      Thread.__init__(self)
      self.queue = queue
      self.visited_urls = visited_urls
      self.mutex = mutex
      self.collection = collection
      self.collection_mutex = collection_mutex
      self.logger = logging.getLogger('data_big_bang.focused_web_crawler')

   def run(self):
      item = self.queue.get()

      while item != None:
         try:
            url = item['url']
            key = item['key']
            constraint = item['constraint']
            data = fetch(url)

            if data == None:
               self.logger.info('Not fetched: %s because type != text/html', url)
            else:
               links = get_all_links(data, base = url)
               feeds = get_all_feeds(data, base = url)
               interesting = collect(links)

               if interesting:
                  self.collection_mutex.acquire()
                  if key not in self.collection:
                     self.collection[key] = {'feeds':{}}

                  if feeds:
                     for feed in feeds:
                        self.collection[key]['feeds'][feed['href']] = feed['type']

                  for service, accounts in interesting.items():
                     if service not in self.collection[key]:
                        self.collection[key][service]  = {}

                     for a,u in accounts.items():
                        self.collection[key][service][a] = {'url': u, 'depth':constraint.depth}
                  self.collection_mutex.release()

               for l in links:
                  new_constraint = constraint.inherit(url, l)
                  if new_constraint == None:
                     continue

                  self.mutex.acquire()
                  if l not in self.visited_urls:
                     self.queue.put({'url':l, 'key':key, 'constraint': new_constraint})
                     self.visited_urls.add(l)
                  self.mutex.release()

         except HTTPError:
            self.logger.info('HTTPError exception on url: %s', url)

         self.queue.task_done()

         item = self.queue.get()

      self.queue.task_done() # task_done on None

fetcher.py

import urllib2
import logging

def fetch(uri):
   fetch.logger.info('Fetching: %s', uri)
   #logger = logging.getLogger('data_big_bang.focused_web_crawler')
   print uri

   h = urllib2.urlopen(uri)
   if h.headers.type == 'text/html':
      data = h.read()
   else:
      data = None

   return data

fetch.logger = logging.getLogger('data_big_bang.focused_web_crawler')

evaluator.py

import lxml.html
import urlparse

def get_all_links(page, base = ''):
   doc = lxml.html.fromstring(page)
   links = map(lambda x: urlparse.urljoin(base, x.attrib['href']), filter(lambda x: 'href' in x.attrib, doc.xpath('//a')))

   return links

def get_all_feeds(page, base = ''):
   doc = lxml.html.fromstring(page)

   feeds = map(lambda x: {'href':urlparse.urljoin(base, x.attrib['href']),'type':x.attrib['type']}, filter(lambda x: 'type' in x.attrib and x.attrib['type'] in ['application/atom+xml', 'application/rss+xml'], doc.xpath('//link')))

   return feeds

constraint.py

import urlparse
from parse_domain import parse_domain

class Constraint:
   DEPTH = 1
   def __init__(self):
      self.depth = 0

   def inherit(self, base_url, url):
      base_up = urlparse.urlparse(base_url)
      up = urlparse.urlparse(url)

      base_domain = parse_domain(base_url, 2)
      domain = parse_domain(url, 2)

      if base_domain != domain:
         return None

      if self.depth >= Constraint.DEPTH: # only crawl two levels
         return None
      else:
         new_constraint = Constraint()
         new_constraint.depth = self.depth + 1

         return new_constraint

collector.py

import urlparse
import re

twitter = re.compile('^http://twitter.com/(#!/)?(?P[a-zA-Z0-9_]{1,15})$')

def collect(urls):
   collection = {'twitter':{}}
   for url in urls :
      up = urlparse.urlparse(url)
      hostname = up.hostname

      if hostname == None:
         continue

      if hostname == 'www.facebook.com':
         pass
      elif hostname == 'twitter.com':
         m = twitter.match(url)

         if m:
            gs = m.groupdict()
            if 'account' in gs:
               if gs['account'] != 'share': # this is not an account, although http://twitter.com/#!/share says that this account is suspended.
                  collection['twitter'][gs['account']] = url
      elif hostname == 'www.linkedin.com':
         pass
      elif hostname == 'plus.google.com':
         pass
      elif hostname == 'www.slideshare.net':
         pass
      elif hostname == 'www.youtube.com':
         pass
      elif hostname == 'www.flickr.com':
         pass
      elif hostname[-9:] == '.xing.com':
         pass
      else:
         continue

   return collection

Further Work

This process can be integrated with a variety of CRM and business intelligence processes like Salesforce, Microsoft Dynamics, and SAP. These applications provide APIs to retrieve company URLs which you can crawl with our script.

The discovery process is just the first step in studying your prospective customers and generating leads. Once you have stored the sources of company information it is possible to apply machine learning tools to search for more opportunities.

See Also

  1. Enriching a List of URLs with Google Page Rank
  2. Integrating Google Analytics into your Company Loop with a Microsoft Excel Add-on

Resources

  1. Sales process
  2. Sales process engineering
  3. Microsoft Dynamics API
  4. Salesforce API
  5. SAP API
  6. SugarCRM Developer Zone