Adding Acknowledgement Semantics to a Persistent Queue

Persistence capability is not enough to ensure the reliability of message oriented middleware. Suppose that you retrieve an item from a queue, and the application or thread crashes in the middle of the process. The item and processes depending on it will be lost, since the crash occurred after retrieving the item from the queue. Acknowledgement semantics can prevent this loss If the application crashes before acknowledging an item. This item will continue to be available to other consumers until an acknowledgment is sent.

This Python code shows how to add acknowledgement to a class derived from the Python Queue class. In the article Persisting Native Python Queues we only show how to persist a queue. It is important to note that we have modified the base Python Queue class, adding the “connect” and “ack” methods. Each application thread must call the “connect” method before using the queue object. The “connect” method returns a unique queue proxy. If the thread crashes, the items that have been fetched, but not acknowledged, in this queue are enqueued again. The “ack” method uses the item returned by the “get” method and effectively removes the item from the queue. In this code ZODB is used for persistence instead of DyBASE. If the entire application crashes, not just a single thread, unacknowledged items are requeued when it restarts.

While acknowledgement semantics increases reliability, it is not infallible. Imagine that after processing an acknowledged item, the result of the process is also added to the queue. In some web crawling implementations, first a URL is retrieved from a queue and acknowledged, then an HTML page is fetched from that URL, and finally the links on that page are inserted in the queue. Two problems can occur if the application or thread crashes during this process. If items, in this case URLs, are acknowledged and thus eliminated as soon as they are retrieved, they may be eliminated before enqueuing all of the links on the page. In this case, the remaining links will be lost. If, on the other hand, items are acknowledged only after enqueuing all the links, some links will be duplicated. This conflict is solved with queue transaction semantics. If the process or thread crashes a rollback is performed.

Notes

  1. This persistent queue with acknowledgement assumes that the objects in the queue all have different identities, id(obji) != id(objj) for all i,j. Making a copy of the object works for mutable objects. Immutable objects must be wrapped.
  2. The object classes in the queue must inherit from the Persistent class, including object members.

Prerequisites

  1. Python 2.x (x>=6)
  2. ZODB3

Code

The code is available at github and includes a series of unit tests.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Persisting Native Python Queues

Resources

  1. AMQP Acknowledgement
  2. HornetQ Asynchronous Send Acknowledgements
  3. HornetQ Transactions
  4. ZODB In Real Life
  5. Storing Persistent Objects with Persistent Objects as attributes of the Parent PO

Photo taken by Paul Downey

Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

#!/usr/bin/python

from Queue import Queue
import dybase
import sys

MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1

#DEBUG = True
DEBUG = False

class Root(dybase.Persistent):
	def __init__(self):
		self.start = 0
		self.stop = 0

class SizeOfPersistentQueueExceeded(Exception):
	pass

class incomplete_persistent_deque:
	def __init__(self, filename):
		self._init_db(filename)

	def _init_db(self, filename):
		self.db = dybase.Storage()
		if self.db.open(filename):
			self.root = self.db.getRootObject()
			if self.root == None:
				self.root = Root()
				self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
				self.root.pending_elements = self.db.createIntIndex()

				self.db.setRootObject(self.root)
				self.db.commit()
			else:
				if DEBUG:
					print "self.root already exists"

		if DEBUG:
			print "self.root.start =", self.root.start
			print "self.root.stop = ", self.root.stop

	def __len__(self):
		if self.root.stop >= self.root.start:
			return self.root.stop - self.root.start
		else:
			return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)

	def append(self, item):
		# add element to index
		self.root.elements.insert(self.root.stop, item)
		self.root.stop += 1
		if self.root.stop > MAX_INT:
			# check also if stop touches start
			self.root.stop = MIN_INT

		if self.root.start == self.root.stop:
			raise SizeOfPersistentQueueExceeded

		# persist
		self.root.store()
		self.db.commit()

	def popleft(self):
		# don't check if empty, Queue class take charge of that
		# remove element from index
		item = self.root.elements.get(self.root.start)
		self.root.elements.remove(self.root.start)
		self.root.start += 1
		if self.root.start > MAX_INT:
			# check also if start touches stop
			self.root.start = MIN_INT 

		if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
			self.root.start = 0
			self.root.stop = 0

		# persist
		self.root.store()
		self.db.commit()

		return item

class PersistentQueue(Queue):
	def __init__(self, filename, maxsize = 0):
		self.filename = filename
		Queue.__init__(self, maxsize)

	def _init(self, maxsize):
		# original: self.queue = deque()

		# incomplete_persistent_deque:
		# - incomplete implementation but enough for Queue:
		# - implemented methods:
		# -- __len__
		# -- append
		# -- popleft
		#

		self.queue = incomplete_persistent_deque(self.filename)

	def connect(self): # to handle failovers
		pass

	def ack(self):
		pass

	#def ack(self, item):

class ElementTest:
	def __init__(self, value):
		self.value = value

	def __repr__(self):
		return self.value

	def __str__(self):
		return self.value

def test1():
	q = PersistentQueue("myqueue.dbs")
	if not q.empty(): # get pending items
		while not q.empty():
			e = q.get()
			print e

	for s in ['one', 'two', 'three']:
		q.put(ElementTest(s))

def main(): # run this script twice to see the persisted elements
	test1()

if __name__ == '__main__':
	main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template

Using Queues in Web Crawling and Analysis Infrastructure

Message oriented middleware (MOM) is a key technology for implementing a custom pipeline and analyzing unstructured data. The pipeline for going from crawling web pages to part of speech tagging (PoST) and beyond is long. It requires a variety of processes which are implemented in several different programming languages and operating systems. For example, boilerpipe is an excellent Java library for extracting main text content while PoSTs libraries, like NLTK or FreeLing, are implemented in Python.

One might be tempted to integrate different technologies using web services but web services alone have many weak points. If the pipeline has ten processes and, for example, the last one fails, then the intermediate processes can be lost if they are not persisted. There must be a higher level mechanism in place to resume the pipeline processing. MOMs ensure message persistence until a consumer acknowledges that a specific process has finished.

There are a lot of MOMs to choose from, including commercial and free open source variants. Some features are present in almost all of them while others are not. Contention management is an important feature if you are dealing, as is likely, with a high ratio of messages produced to messages consumed at any one time. For example, a web crawler can fetch web pages at an incredibly high speed while processes like content extraction take longer. Running a message queue without contention management under these circumstances will exhaust the machine’s memory.

While MOMs are important for uniting heterogeneous technologies, the different processes must also know which queues to utilize to consume the input and produce the output for the next phases. A new wave of frameworks like NServiceBusResque, Celery, and Octobot has emerged to handle this.

In conclusion, MOMs help to connect heterogeneous technologies and bring robustness, and are very useful in the context of unstructured information like text analysis. Many MOMs are available, but there is not a single one with a complete feature set. However some of these features can be supplied by frameworks such as NServiceBus, Resque, Celery, and Octobot.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. Message Queues vs Web Services
  2. Message Queue Evaluation Notes
  3. The Hadoop Map-Reduce Capacity Scheduler
  4. Contention Management in the WSA
  5. Message Queuing Architectures

Integrating Google Analytics into your Company Loop with a Microsoft Excel Add-on

 

Introduction

Google Analytics and AdWords are essential marketing and sales tools. They can be integrated with the ubiquitous Microsoft Excel with the Google Data API. Data Big Bang’s Nicolas Papagna has developed an Excel add-on which can be downloaded here. This plugin enables Excel users to quickly retrieve Google Analytics data using the available Google Analytics metrics, and dimensions, and may also be sorted by the user’s criteria. One of the advantages of our solution is that Excel accesses the Google Analytics API directly instead of accessing it thru Data Big Bang server. Other solutions need access to your information which this exposes your private data to third parties.

Installation and Usage

  1. Download GoogleAnalyticsToExcel.AddInSetup_1.0.20.0.exe.
  2. Install it.
  3. Run Microsoft Excel.
  4. Configure your Google credentials by clicking on “Settings” under the “Google Analytics to Excel Addin” ribbon tab.
  5. Customize your query and retrieve your Google Analytics data by clicking “Query Google Analytics” button.

Development Notes

Data Big Bang’s research team has also developed an OData web service that can be consumed using applications such as PowerPivot, Tableau and LINQPad. This web service doesn’t require any add-ons. However, since unfortunately neither PowerPivot nor Tableau offer query builders to interact with OData providers, users must know how to craft the OData URL query themselves. The most interesting part of this project was developing a Google Data Protocol to Open Data Protocol .NET class that offers an IQueryable interface to convert LINQ queries to GData. LINQ queries add a lot of expressive power beyond GData.

See Also

  1. Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website
  2. Integrating Dropbox with Microsoft Outlook
  3. Exporting StackOverflow users blogs to Excel Hyperlinks

Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website

Searching for Sales Leads

The best definition of “marketing” I have read is by Dave Kellog in the What the CEO Really Thinks of Marketing (And 5 Things You Can Do About It) presentation. He says that marketing exists to make sales easier. For example, the process of searching for sales opportunities can be optimized if we pay attention to what our prospectives and current customers are sharing on different social media. Good corporate blogs include insightful information about the company’s aims. The first step in this direction is to discover what web resources a specific company has available. The discovery process is easier for companies than for individuals. Individuals uses a variety of aliases and alternative identities on the web. while companies with good communication strategies provide links to all of their web resources on their primary sites.

Discovery

We offer a script which retrieves web resources connected to any company’s URL. With this tool you will no longer waste time manually searching for this useful information. Companies and people usually have a number of associated sites: blogs; LinkedIn accounts; Twitter accounts; Facebook pages; and videos and photos on specialized sites such as YouTube, Vimeo, Flickr, or Picassa. A recursive level of page crawling is needed to retrieve the location of associated resources. Large companies such as IBM or Dell have multiple accounts associated with different areas. IBM has different Twitter accounts for their research divisions and for the important corporate news.

Usage

fwc.py <input.yaml> <output.yaml>

Look at data-science-organizations.yaml for an example.

Prerequisites

  1. Python 2.7 (or greater 2.x series)
  2. lxml.html
  3. parse_domain.py
  4. PyYAML

Script

This code is available at github.

fwc.py

#!/usr/bin/python2.7

import argparse
import sys
from focused_web_crawler import FocusedWebCrawler
import logging
import code
import yaml
from constraint import Constraint

def main():
   logger = logging.getLogger('data_big_bang.focused_web_crawler')
   ap = argparse.ArgumentParser(description='Discover web resources associated with a site.')
   ap.add_argument('input', metavar='input.yaml', type=str, nargs=1, help ='YAML file indicating the sites to crawl.')
   ap.add_argument('output', metavar='output.yaml', type=str, nargs=1, help ='YAML file with the web resources discovered.')

   args = ap.parse_args()

   input = yaml.load(open(args.input[0], "rt"))

   fwc = FocusedWebCrawler()

   for e in input:
      e.update({'constraint': Constraint()})
      fwc.queue.put(e)

   fwc.start()
   fwc.join()

   with open(args.output[0], "wt") as s:
      yaml.dump(fwc.collection, s, default_flow_style = False)

if __name__ == '__main__':
   main()

focused-web-crawler.py

from threading import Thread, Lock
from worker import Worker
from Queue import Queue
import logging

class FocusedWebCrawler(Thread):
   NWORKERS = 10
   def __init__(self, nworkers = NWORKERS):
      Thread.__init__(self)
      self.nworkers = nworkers
      #self.queue = DualQueue()
      self.queue = Queue()
      self.visited_urls = set()
      self.mutex = Lock()
      self.workers = []
      self.logger = logging.getLogger('data_big_bang.focused_web_crawler')
      sh = logging.StreamHandler()
      formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
      sh.setFormatter(formatter)
      self.logger.addHandler(sh)
      self.logger.setLevel(logging.INFO)
      self.collection = {}
      self.collection_mutex = Lock()

   def run(self):
      self.logger.info('Focused Web Crawler launched')
      self.logger.info('Starting workers')
      for i in xrange(self.nworkers):
         worker = Worker(self.queue, self.visited_urls, self.mutex, self.collection, self.collection_mutex)
         self.workers.append(worker)
         worker.start()

      self.queue.join() # Wait until all items are consumed

      for i in xrange(self.nworkers): # send a 'None signal' to finish workers
         self.queue.put(None)

      self.queue.join() # Wait until all workers are notified

#     for worker in self.workers:
#        worker.join()

      self.logger.info('Finished workers')
      self.logger.info('Focused Web Crawler finished')

worker.py

from threading import Thread
from fetcher import fetch
from evaluator import get_all_links, get_all_feeds
from collector import collect
from urllib2 import HTTPError
import logging

class Worker(Thread):
   def __init__(self, queue, visited_urls, mutex, collection, collection_mutex):
      Thread.__init__(self)
      self.queue = queue
      self.visited_urls = visited_urls
      self.mutex = mutex
      self.collection = collection
      self.collection_mutex = collection_mutex
      self.logger = logging.getLogger('data_big_bang.focused_web_crawler')

   def run(self):
      item = self.queue.get()

      while item != None:
         try:
            url = item['url']
            key = item['key']
            constraint = item['constraint']
            data = fetch(url)

            if data == None:
               self.logger.info('Not fetched: %s because type != text/html', url)
            else:
               links = get_all_links(data, base = url)
               feeds = get_all_feeds(data, base = url)
               interesting = collect(links)

               if interesting:
                  self.collection_mutex.acquire()
                  if key not in self.collection:
                     self.collection[key] = {'feeds':{}}

                  if feeds:
                     for feed in feeds:
                        self.collection[key]['feeds'][feed['href']] = feed['type']

                  for service, accounts in interesting.items():
                     if service not in self.collection[key]:
                        self.collection[key][service]  = {}

                     for a,u in accounts.items():
                        self.collection[key][service][a] = {'url': u, 'depth':constraint.depth}
                  self.collection_mutex.release()

               for l in links:
                  new_constraint = constraint.inherit(url, l)
                  if new_constraint == None:
                     continue

                  self.mutex.acquire()
                  if l not in self.visited_urls:
                     self.queue.put({'url':l, 'key':key, 'constraint': new_constraint})
                     self.visited_urls.add(l)
                  self.mutex.release()

         except HTTPError:
            self.logger.info('HTTPError exception on url: %s', url)

         self.queue.task_done()

         item = self.queue.get()

      self.queue.task_done() # task_done on None

fetcher.py

import urllib2
import logging

def fetch(uri):
   fetch.logger.info('Fetching: %s', uri)
   #logger = logging.getLogger('data_big_bang.focused_web_crawler')
   print uri

   h = urllib2.urlopen(uri)
   if h.headers.type == 'text/html':
      data = h.read()
   else:
      data = None

   return data

fetch.logger = logging.getLogger('data_big_bang.focused_web_crawler')

evaluator.py

import lxml.html
import urlparse

def get_all_links(page, base = ''):
   doc = lxml.html.fromstring(page)
   links = map(lambda x: urlparse.urljoin(base, x.attrib['href']), filter(lambda x: 'href' in x.attrib, doc.xpath('//a')))

   return links

def get_all_feeds(page, base = ''):
   doc = lxml.html.fromstring(page)

   feeds = map(lambda x: {'href':urlparse.urljoin(base, x.attrib['href']),'type':x.attrib['type']}, filter(lambda x: 'type' in x.attrib and x.attrib['type'] in ['application/atom+xml', 'application/rss+xml'], doc.xpath('//link')))

   return feeds

constraint.py

import urlparse
from parse_domain import parse_domain

class Constraint:
   DEPTH = 1
   def __init__(self):
      self.depth = 0

   def inherit(self, base_url, url):
      base_up = urlparse.urlparse(base_url)
      up = urlparse.urlparse(url)

      base_domain = parse_domain(base_url, 2)
      domain = parse_domain(url, 2)

      if base_domain != domain:
         return None

      if self.depth >= Constraint.DEPTH: # only crawl two levels
         return None
      else:
         new_constraint = Constraint()
         new_constraint.depth = self.depth + 1

         return new_constraint

collector.py

import urlparse
import re

twitter = re.compile('^http://twitter.com/(#!/)?(?P[a-zA-Z0-9_]{1,15})$')

def collect(urls):
   collection = {'twitter':{}}
   for url in urls :
      up = urlparse.urlparse(url)
      hostname = up.hostname

      if hostname == None:
         continue

      if hostname == 'www.facebook.com':
         pass
      elif hostname == 'twitter.com':
         m = twitter.match(url)

         if m:
            gs = m.groupdict()
            if 'account' in gs:
               if gs['account'] != 'share': # this is not an account, although http://twitter.com/#!/share says that this account is suspended.
                  collection['twitter'][gs['account']] = url
      elif hostname == 'www.linkedin.com':
         pass
      elif hostname == 'plus.google.com':
         pass
      elif hostname == 'www.slideshare.net':
         pass
      elif hostname == 'www.youtube.com':
         pass
      elif hostname == 'www.flickr.com':
         pass
      elif hostname[-9:] == '.xing.com':
         pass
      else:
         continue

   return collection

Further Work

This process can be integrated with a variety of CRM and business intelligence processes like Salesforce, Microsoft Dynamics, and SAP. These applications provide APIs to retrieve company URLs which you can crawl with our script.

The discovery process is just the first step in studying your prospective customers and generating leads. Once you have stored the sources of company information it is possible to apply machine learning tools to search for more opportunities.

See Also

  1. Enriching a List of URLs with Google Page Rank
  2. Integrating Google Analytics into your Company Loop with a Microsoft Excel Add-on

Resources

  1. Sales process
  2. Sales process engineering
  3. Microsoft Dynamics API
  4. Salesforce API
  5. SAP API
  6. SugarCRM Developer Zone

Helping Search Engines to Find Content in the Invisible Web

Discovering Hidden Web Resources

Search engines and social networks are digital telescopes. It is extremely difficult and time consuming to find web resources outside of their lens. It’s a search craft. Our intuition knows that there are interesting invisible information but we can’t touch it.

IMDB contains a lot of information about users but the site only offers sharing as a collateral feature. If we search on Google we can’t find all the users sharing their movie rankings. At the time of writing of this article the query: site:imdb.com inurl:”user/*/ratings” was returning a few results on Google. How we can help people, through search engines, to find more web resources? This article shows the first 10 million results of the Distributed Scraping With Multiple Tor Circuits process. In a short time Google will index this article and include these new resources so everyone can find them.

In the meantime you have the great honor to see web resources that are invisible for search engines. These page contains the first 10 million of IMDB users sharing their movie’s ratings. We have included a script below to get their ratings taking advantage of the comma separated value export offered by IMDB.

Python Code for Exporting IMDB Ratings in Comma Separated Values

get-user-ratings.py

#!/usr/bin/python2.7

import pymongo
import urllib2

MONGODB_HOSTNAME = 'localhost'

HTML = """
<html>
<body>
{0}
</body>
</html>
"""

EXPORT_URL = "http://www.imdb.com/list/export?list_id=ratings&author_id={0}"

def main():
   conn = pymongo.Connection(MONGODB_HOSTNAME, 27017)
   db = conn.scraping
   coll = db.imdb.ratings

   items = coll.find({'last_response':200})

   links = ""

   i = 0
   for item in items:
      url = item['url']
      index = 'ur{0:07}'.format(item['index'])
      filename = 'ur{0}.csv'.format(item['index'])
      links += "<a href='{0}'>{1}</a><br>".format(url, index)

     with open(filename, "wt") as h:
        h.write(urllib2.urlopen(EXPORT_URL.format(index)).read())

   print HTML.format(links)

if __name__ == '__main__':
   main()

Resources

  1. Discovering URLs through User Feedback
  2. Invisible Web
  3. Deep Web Research 2012

Photo taken by gari.baldi

Running Console Applications with Invisible Windows

Hiding Console Application Windows

With the simple free source code and executable application below you can launch and run multiple console applications using cmd.exe without displaying the console window. This code eliminates the need to build a service, and is a useful complement to the Distributed Scraping With Multiple Tor Circuits article for those running Tor under the Microsoft Windows operating system. It includes a batch file to enable you to run multiple Tor proxies. Other companies charge for a similar application.

Application

You can download the application hideconsole.exe here.

Code

This is the simple code. If you want to download the entire Visual Studio 2010 project or fork it please use the github project.

// hideconsole.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "windows.h"
#include <string>
#include <iostream>
#include <stdio.h>

namespace std
{
#ifdef _UNICODE
	typedef wstring tstring;
	#define tout std::wcout
#else
	typedef string tstring;
	#define tout std::cout
#endif
}
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
	if(argc == 1) {
		//wprintf(_T("Usage: %s <cmd> [<parameter1>..<parametern>]"));
		tout << "Usage: " << argv[0] << " <cmd> [<parameter1>..<parametern>]" << endl;
		return 0;
	}

	tstring file = argv[1];
	tstring parameters;

	for(int i=2; i < argc; i++) {
		if( i != 2)
			parameters.append(_T(" "));
		parameters.append(argv[i]);
	}

	ShellExecute(NULL, _T("open"), file.c_str(), parameters.c_str(), NULL, SW_HIDE);
	tout << "Running cmd = " << file << endl;
	tout << "Arguments = " << parameters << endl;

	return 0;
}

Batch for Running Multiple Tor Instances

mytor.bat

@echo off
echo %3%
IF EXIST data\tor%3 GOTO DATASUBDIREXISTS
	mkdir data\tor%3
:DATASUBDIREXISTS
.\hideconsole.exe c:\windows\system32\cmd.exe /k """"""c:\Program Files (x86)\tor\tor.exe""" --RunAsDaemon 1 --CookieAuthentication 0 --HashedControlPassword """" --SocksListenAddress 192.168.0.178 --ControlPort %2 --PidFile tor%3.pid --SocksPort %1 --DataDirectory data\tor%3""""

start-multiple-tor.bat

@echo off
setlocal enabledelayedexpansion

set /A base_socks_port=9050
set /A base_control_port=8118
set /A idx=0

IF EXIST data GOTO DATAEXISTS
mkdir data

:DATAEXISTS

FOR /L %%i IN (1,1,80) DO (
	call .\mytor !base_socks_port! !base_control_port! !idx!
	set /A base_socks_port+=1
	set /A base_control_port+=1
	set /A idx+=1
	rem echo !idx!
)

Note

  • The multiple quotes that you see in the batch file are necessary since we are escaping quotes three times.

Resources

  1. CMD.EXE: Escape Characters, Delimiters and Quotes
  2. CMD.EXE Escape Character, Caret. OK, But What About Backslash?

Image by william

Running Your Own Anonymous Rotating Proxies

Rotating Proxies with HAProxy

Most web browsers and scrapers can only be configured to use one proxy per protocol. You can get around this limitation by running different instances of browsers and scrapers. Google Chrome and Firefox allow multiple profiles. However, running hundreds of browser instances is unwieldy.

A better option is to set up your own proxy to rotate among a set of Tor proxies.The Tor application implements a SOCKS proxy. Start multiple Tor instances on one or more machines and networks, then configure and run an HTTP load balancer to expose a single point of connection instead of adding the rotating logic within the client application. On the Distributed Scraping With Multiple Tor Circuits article we learned how to set up multiple Tor SOCKS proxies for web scraping and crawling. However our sample code launched multiple threads each of which uses a different proxy. In this example we use the HAProxy load balancer with a round-robin strategy to rotate our proxies.

When you are dealing with web crawling and scraping sites with Javascript, using a real browser with a high performance Javascript engine like V8 may be the best approach. Just configuring our rotating proxy in the browser does the trick. Another option is using HTMLUnit but the the V8 Javascript Engine parses web pages and runs Javascript more quickly. If you are using a browser you must be particularly careful to keep the scraped site from correlating your multiple requests. Try disabling cookies, local storage, and image loading, and only enabling Javascript, indeed, you need to cache as many requests as possible. If you need to support cookies, you have to run different browsers with different profiles.

Setup and Configuration

Prerequisites

  1. Tor
  2. DeleGate
  3. HAProxy

HAProxy Configuration File

rotating-tor-proxies.cfg

global
        daemon
        maxconn 256

defaults
        mode http
        timeout connect 5000ms
        timeout client 50000ms
        timeout server 50000ms

frontend rotatingproxies
        bind *:3128
        default_backend tors
        option http_proxy

backend tors
        option http_proxy
        server tor1 localhost:3129
        server tor1 localhost:3130
        server tor1 localhost:3131
        server tor1 localhost:3132
        server tor1 localhost:3133
        server tor1 localhost:3134
        server tor1 localhost:3135
        server tor1 localhost:3136
        server tor1 localhost:3137
        server tor1 localhost:3138
        balance roundrobin

Running

Run the following script, which launches many instances of Tor. Then runs one instance of delegated per Tor, and finally runs HAProxy to rotate the proxy servers. We have to use DeleGate because HAProxy does not support SOCKS.

#!/bin/bash
base_socks_port=9050
base_http_port=3129 # leave 3128 for HAProxy
base_control_port=8118

# Create data directory if it doesn't exist
if [ ! -d "data" ]; then
	mkdir "data"
fi

#for i in {0..10}
for i in {0..9}

do
	j=$((i+1))
	socks_port=$((base_socks_port+i))
	control_port=$((base_control_port+i))
	http_port=$((base_http_port+i))
	if [ ! -d "data/tor$i" ]; then
		echo "Creating directory data/tor$i"
		mkdir "data/tor$i"
	fi
	# Take into account that authentication for the control port is disabled. Must be used in secure and controlled environments

	echo "Running: tor --RunAsDaemon 1 --CookieAuthentication 0 --HashedControlPassword \"\" --ControlPort $control_port --PidFile tor$i.pid --SocksPort $socks_port --DataDirectory data/tor$i"

	tor --RunAsDaemon 1 --CookieAuthentication 0 --HashedControlPassword "" --ControlPort $control_port --PidFile tor$i.pid --SocksPort $socks_port --DataDirectory data/tor$i

	echo 	"Running: ./delegate/src/delegated -P$http_port SERVER=http SOCKS=localhost:$socks_port"

	./delegate/src/delegated -P$http_port SERVER=http SOCKS=localhost:$socks_port
done

haproxy -f rotating-tor-proxies.cfg

See Also

  1. Distributed Scraping With Multiple Tor Circuits
  2. Web Scraping Ajax and Javascript Sites

Resources

  1. HAProxy The Reliable, High Performance TCP/HTTP Load Balancer
  2. DeleGate Multi-Purpose Application Level Gateway
  3. Python twisted proxyclient cascade / upstream to squid
  4. How SOPA’s ‘circumvention’ ban could put a target on Tor

Distributed Scraping With Multiple Tor Circuits

Multiple Circuit Tor Solution

When you rapidly fetch different web pages from a single IP address you risk getting stuck in the middle of the scraping. Some sites completely ban scrapers, while others follow a rate limit policy. For example, If you automate Google searches, Google will require you to solve captchas. Google is confused by many people using the same IP, and by search junkies. It used to be costly to get enough IPs to build a good scraping infrastructure. Now there are alternatives: cheap rotating proxies and Tor. Other options include specialized crawling and scraping services like 80legs, or even running Tor on AWS EC2 instances. The advantage of running Tor is its widespread network coverage. Tor is also free of charge. Unfortunately Tor does not allow you to control the bandwidth and latency.

All navigation performed when you start a session on Tor will be associated with the same exit point and its IP addresses. To renew these IP addresses you must restart Tor, or send a newnym signal, or as in our case study you can run multiples Tor instances at the same time If you assign different ports for each one. Many SOCKS proxies will then be ready for use. It is possible for more than one instance to share the same circuit, but that is beyond the scope of this article.

IMDB: A Case Study

If you like movies, Internet Movie Database is omnipresent in your daily life. IMDB users have always been able to share their movies and lists. Recently, however, the site turned previously shared public movie ratings private by default. Useful movie ratings disappeared from Internet with this change, and most of those that were manually set back to public are not indexed by search engines. All links that previously pointed to user ratings are now broken since the URLs have changed. How can you find all the public ratings available on IMDB?
If you use IMDB’s scraping policy it will take years, since the site contains tens of million of user pages. Distributed scraping is the best way to solve this issue and quickly discover which users are sharing their ratings. Our method just retrieves the HTTP response code to find out whether the user is sharing his rating.

Our code sample has three elements:

  1. Multiple Tor instances listening to different ports. The result is many SOCKS proxies available for use with different Tor circuits.
  2. A Python script that launches multiple workers in different threads. Each worker uses a different SOCK port.
  3. MongoDB to persist the state of the scraping if the process fails or if you want to stop the process and continue later.

Shell Script and Source Code

Prerequisites

  1. Tor
  2. MongoDB
  3. PyMongo
  4. SocksiPy
  5. Python

Multiple Tor Launcher

You must run the following script before running the Python script. To adjust the number of Tor instances just change the interval in the loop.

#!/bin/bash

base_socks_port=9050
base_control_port=8118

# Create data directory if it doesn't exist
if [ ! -d "data" ]; then
	mkdir "data"
fi

#for i in {0..10}
for i in {0..80}

do
	j=$((i+1))
	socks_port=$((base_socks_port+i))
	control_port=$((base_control_port+i))
	if [ ! -d "data/tor$i" ]; then
		echo "Creating directory data/tor$i"
		mkdir "data/tor$i"
	fi
	# Take into account that authentication for the control port is disabled. Must be used in secure and controlled environments

	echo "Running: tor --RunAsDaemon 1 --CookieAuthentication 0 --HashedControlPassword \"\" --ControlPort $control_port --PidFile tor$i.pid --SocksPort $socks_port --DataDirectory data/tor$i"

	tor --RunAsDaemon 1 --CookieAuthentication 0 --HashedControlPassword "" --ControlPort $control_port --PidFile tor$i.pid --SocksPort $socks_port --DataDirectory data/tor$i
done

Python Script

The script below stores its results on MongoDB on the “imdb” db under the “imdb.ratings” collection. To handle the number of simultaneous workers you can change the “Discovery.NWorkers” variable. Note that the the number of workers must be equal to or less than the number of Tor instances.

#!/usr/bin/python

import httplib
import socks
import urllib2
from Queue import Queue
from threading import Thread, Condition, Lock
from threading import active_count as threading_active_count

import time
from pymongo import Connection
import pymongo

url_format = 'http://www.imdb.com/user/ur{0}/ratings'

http_codes_counter = {}

MONGODB_HOSTNAME = '192.168.0.118'

"""
https://gist.github.com/869791

SocksiPy + urllib handler

version: 0.2
author: e

This module provides a Handler which you can use with urllib2 to allow it to tunnel your connection through a socks.sockssocket socket, without monkey patching the original socket...
"""

class SocksiPyConnection(httplib.HTTPConnection):
    def __init__(self, proxytype, proxyaddr, proxyport = None, rdns = True, username = None, password = None, *args, **kwargs):
        self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
        httplib.HTTPConnection.__init__(self, *args, **kwargs)

    def connect(self):
        self.sock = socks.socksocket()
        self.sock.setproxy(*self.proxyargs)
        if isinstance(self.timeout, float):
            self.sock.settimeout(self.timeout)
        self.sock.connect((self.host, self.port))

class SocksiPyHandler(urllib2.HTTPHandler):
    def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)

    def http_open(self, req):
        def build(host, port=None, strict=None, timeout=0):
            conn = SocksiPyConnection(*self.args, host=host, port=port, strict=strict, timeout=timeout, **self.kw)
            return conn
        return self.do_open(build, req)

class Monitor(Thread):
	def __init__(self, queue, discovery):
		Thread.__init__(self)
		self.queue = queue
		self.discovery = discovery
		self.finish_signal = False

	def finish(self):
		self.finish_signal = True

	def run(self):
		while not self.finish_signal:
			time.sleep(5)
			print "Elements in Queue:", self.queue.qsize(), "Active Threads:", threading_active_count(), "Exceptions Counter:", self.discovery.exception_counter

class Worker(Thread):
	def __init__(self, queue, discovery, socks_proxy_port):
		Thread.__init__(self)
		self.queue = queue
		self.discovery = discovery
		self.socks_proxy_port = socks_proxy_port
		self.opener = urllib2.build_opener(SocksiPyHandler(socks.PROXY_TYPE_SOCKS4, 'localhost', self.socks_proxy_port))
		self.conn = Connection(MONGODB_HOSTNAME, 27017)
		self.db = self.conn.scraping
		self.coll = self.db.imdb.ratings

	def get_url(self, url):
		try:
			#h = urllib2.urlopen(url)
			h = self.opener.open(url)

			return h.getcode()

		except urllib2.URLError, e:
			return e.code

	def run(self):
		while True:
			try:
				index = self.queue.get()

				if index == None:
					self.queue.put(None) # Notify the next worker
					break

				url = url_format.format(index)

				code = self.get_url(url)

				self.coll.update({'index':index}, {'$set': {'last_response':code}})

				self.discovery.lock.acquire()
				self.discovery.records_to_process -= 1
				if self.discovery.records_to_process == 0:
					self.discovery.lock.notify()
				self.discovery.lock.release()

			except (socks.Socks4Error, httplib.BadStatusLine), e:
				# TypeError: 'Socks4Error' object is not callable
				print e
				self.discovery.exception_counter_lock.acquire()
				self.discovery.exception_counter += 1
				self.discovery.exception_counter_lock.release()
				pass # leave this element for the next cycle

			time.sleep(1.5)

class Croupier(Thread):
	Base = 0
	Top = 25000000
	#Top = 1000
	def __init__(self, queue, discovery):
		Thread.__init__(self)
		self.conn = Connection(MONGODB_HOSTNAME, 27017)
		self.db = self.conn.scraping
		self.coll = self.db.imdb.ratings
		self.finish_signal = False
		self.queue = queue
		self.discovery = discovery
		self.discovery.records_to_process = 0

	def run(self):
		# Look if imdb collection is empty. Only if its empty we create all the items
		c = self.coll.count()
		if c == 0:
			print "Inserting items"
			self.coll.ensure_index([('index', pymongo.ASCENDING), ('last_response', pymongo.ASCENDING)])
			for i in xrange(Croupier.Base, Croupier.Top):
				self.coll.insert({'index':i, 'url': url_format.format(i), 'last_response': 0})

		else:
			print "Using #", c, " persisted items"

		while True:
			#items = self.coll.find({'last_response': {'$ne': 200}})
			items = self.coll.find({'$and': [{'last_response': {'$ne': 200}}, {'last_response' : {'$ne': 404}}]}, timeout = False)

			self.discovery.records_to_process = items.count()

			if self.discovery.records_to_process == 0:
				break

			for item in items:
				self.queue.put(item['index'])

			# Wait until the last item is updated on the db
			self.discovery.lock.acquire()
			while self.discovery.records_to_process != 0:
				self.discovery.lock.wait()
			self.discovery.lock.release()

#			time.sleep(5)

		# Send a 'signal' to workers to finish
		self.queue.put(None)

	def finish(self):
		self.finish_signal = True

class Discovery:
	NWorkers = 71
	SocksProxyBasePort = 9050
	Contention = 10000

	def __init__(self):
		self.queue = Queue(Discovery.Contention)
		self.workers = []
		self.lock = Condition()
		self.exception_counter_lock = Lock()
		self.records_to_process = 0
		self.exception_counter = 0

	def start(self):
		croupier = Croupier(self.queue, self)
		croupier.start()

		for i in range(Discovery.NWorkers):
			worker = Worker(self.queue, self, Discovery.SocksProxyBasePort + i)
			self.workers.append(worker)

		for w in self.workers:
			w.start()

		monitor = Monitor(self.queue, self)
		monitor.start()

		for w in self.workers:
			w.join()

		croupier.join()

		print "Queue finished with:", self.queue.qsize(), "elements"

		monitor.finish()

def main():
	discovery = Discovery()
	discovery.start()

if __name__ == '__main__':
	main()

#
# MISC NOTES
#
# - How many IMDB ratings pages are currently indexed by Google? query: inurl:www.imdb.com/user/*/ratings
# - [pymongo] cursor id '239432858681488351' not valid at server Options: http://groups.google.com/group/mongodb-user/browse_thread/thread/4ed6e3d77fb1c2cf?pli=1
#     That error generally means that the cursor timed out on the server -
#     this could be the case if you are performing a long running operation
#     while iterating over the cursor. The best bet is probably to turn off
#     the timeout by passing "timeout=False" in your call to find:
#

This script will gather users with public ratings using the following MongoDB query: db.imdb.ratings.find({‘last_response’: 200})
Try exporting the movies ratings. This the easiest part because it is now a comma separated value file and you don’t need an XPath query.

Additional observations

  1. We are not just using MongoDB because it is fancy, but also because it is very practical for quickly prototyping and persisting data along the way. The well-known “global lock” limitation on MongoDB (and many other databases) does not significantly affect its ability to efficiently store data.
  2. We use SocksiPy to allow us to use different proxies at the same time.
  3. If you are serious about using Tor to build a distributed infrastructure you might consider running Tor proxies on AWS EC2 instances as needed.
  4. Do not forget to run Tor instances in a secure environment since the control port is open to everyone without authentication.
  5. Our solution is easily scalable.
  6. If you get many 503 return codes, try balancing the quantity of proxies and delaying each worker’s activity.

See Also

  1. Running Your Own Anonymous Rotating Proxies
  2. Web Scraping Ajax and Javascript Sites
  3. Luminati Distributed Web Crawling

Resources

  1. An Improved Algorithm for Tor Circuit Scheduling
  2. How Much Anonymity does Network Latency Leak?
  3. StackOverflow Tor Questions
  4. New IMDB Ratings Breaks Everything
  5. Distributed Harvesting and Scraping

Automatically Tracking Events with Google Analytics, jQuery and jsUri

Pragmatic Code

Google analytics can track user events on a web page. This article shows a code snippet which automates the insertion of tracking code. Instead of adding tracking codes manually one tag at a time, we bind the code to the click event automatically. We opt not to make use of the plugins for libs such as jQuery or for applications such as WordPress so as to have full control over the process.Since multiple interactions can take place on a single page, it is essential to add tracking codes to log user interactions. Tracking codes are also needed to track clicks on links to external sites.

JsUri is the most robust library to parse URIs since a parsing function is sadly not included in javascript implementations (only a trick).

This is how we implemented it on our Data Big Bang blog to track clicks to other sites:

<!-- Inside <head> -->
<script type='text/javascript' src='http://www.databigbang.com/js/jquery-1.7.min.js?ver=1.7.0'></script>
<script type='text/javascript' src='http://www.databigbang.com/js/jsuri-1.1.1.min.js?ver=1.1.1'></script>

<!-- After <body> -->
<script type="text/javascript">
	// Track click on hyperlinks to external sites
	$(document).ready(function() {
		$('a').click(function(event) {
			var target = event.target;
			var uri = new Uri(target);
			if(uri.host() != 'www.databigbang.com' && uri.host() != 'blog.databigbang.com') {
				//alert('Match!'); // Only for debugging
				_gaq.push(['_trackEvent', 'UI', 'Click', target.toString(), 0, true]);
			}
		});
	});
</script>

Indeed this is how we configure WordPress to get the libs automatically: edit the functions.php under the theme folder.

if( !is_admin()) {
	wp_deregister_script('jquery');

#	Avoid retrieving jquery libs from ajax.googleapis.com since Google domains can be blocked in countries like China.
#	wp_register_script('jquery', ("http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"), false, "1.7.0");
	wp_register_script('jquery', ('http://www.databigbang.com/js/jquery-1.7.min.js'), false, '1.7.0');

	wp_enqueue_script('jquery');

	wp_deregister_script('jsuri');
	wp_register_script('jsuri', ("http://www.databigbang.com/js/jsuri-1.1.1.min.js"), false, "1.1.1");
	wp_enqueue_script('jsuri');
}

This is how we implemented it on our secure coupon codes generator site to track clicks on a rich web application.

<!-- Inside <head> -->
<script type='text/javascript' src='http://www.databigbang.com/js/jquery-1.7.min.js?ver=1.7.0'></script>
<script type='text/javascript' src='http://www.databigbang.com/js/jsuri-1.1.1.min.js?ver=1.1.1'></script>	

<!-- After <body> -->
$(document).ready(function() {
	// Add Event Trackers
	$('a').click(function(event) {
		var target = event.target;
		var uri = new Uri(target.href);

		if(uri.host() == 'www.securecouponcodes.com') {
			//alert('match link');

			_gaq.push(['_trackEvent', 'UI', 'Click', target.href, 0, true]);

		}
	});

	$('button').click(function(event) {
		var target = event.target;
		//alert('match button');

		_gaq.push(['_trackEvent', 'UI', 'Click', target.innerText, 0, true]);
	});
});

Resources

  1. How do I parse a URL into hostname and path in javascript?
  2. Event Tracking Guide
  3. Is Google’s CDN for jQuery available in China?