License Github actions Coveralls pypi Documents

The Record Linkage ToolKit (RLTK) is a general-purpose open-source record linkage platform that allows users to build powerful Python programs that link records referring to the same underlying entity. Record linkage is an extremely important problem that shows up in domains extending from social networks to bibliographic data and biomedicine. Current open platforms for record linkage have problems scaling even to moderately sized datasets, or are just not easy to use (even by experts). RLTK attempts to address all of these issues.

RLTK supports a full, scalable record linkage pipeline, including multi-core algorithms for blocking, profiling data, computing a wide variety of features, and training and applying machine learning classifiers based on Python’s sklearn library. An end-to-end RLTK pipeline can be jump-started with only a few lines of code. However, RLTK is also designed to be extensible and customizable, allowing users arbitrary degrees of control over many of the individual components. You can add new features to RLTK (e.g. a custom string similarity) very easily.

RLTK is being built by the Center on Knowledge Graphs at USC/ISI, with funding from multiple projects funded by the DARPA LORELEI and MEMEX programs and the IARPA CAUSE program. RLTK is under active maintenance and we expect to keep adding new features and state-of-the-art record linkage algorithms in the foreseeable future, in addition to continuously supporting our adopters to integrate the platform into their applications.

Getting Started

Installation (make sure prerequisites are installed):

pip install -U rltk

Example:

>>> import rltk
>>> rltk.levenshtein_distance('abc', 'abd')
1

Tutorial

Installation

Note

RLTK only supports Python 3 and it’s tested under Python 3.7+.

pip

Using pip to install:

pip install rltk

If you want to update RLTK:

pip install -U rltk

Generally, it’s recommended to install packages in a virtual environment:

virtualenv rltk_env
source activate rltk_env
pip install rltk

Install from source

The other way to install RLTK is to clone from GitHub repository and build it from source:

git clone https://github.com/usc-isi-i2/rltk.git
cd rltk

virtualenv rltk_env
source activate rltk_env
pip install -e .

Run tests

RLTK uses pytest for unit tests. To run them, simply do following command from the root of rltk package:

pytest

If you need more detailed information, do:

pytest -v --color=yes

Build documentation

Additional dependencies for building documentation should be installed first:

pip install -r requirements_docs.txt

Documentation is powered by Sphinx , to generate it on your local, please run:

cd docs
make html # the generated doc is located at _build/html/index.html

Overview

What’s Record Linkage

Record linkage (RL) is the task of finding records in a data set that refer to the same entity across different data sources (e.g., data files, books, websites, and databases). – Wikipedia

Assume we have two following tables:

_images/overview-tables.png

It’s obvious that both of them have identical id A04 and these two belong to the same entity. Then we know that “David’s id is A04 and he is a male”.

But real world situations are more complex:

  • Typos: “Joh” vs “John”
  • OCR errors: “J0hn” vs “John”
  • Formatting conventions: “03/17” vs “March 17”
  • Abbreviations: “J. K. Rowling” vs “Joanne Rowling”
  • Nick names: “John” vs “Jock”
  • Word order: “Sargent, John S.” vs “John S. Sargent”

Record Linkage Toolkit (RLTK) is designed to give a easy to use, scalable and extensible way of resolving these problems.

Basic Components & Data Flow

The first step of using RLTK is to create Dataset.

_images/overview-dataflow.png

In RLTK, every “row” in table is called Record. Notice “row” here is a logical concept: for a csv file it’s a csv object (can be formed by multiple lines); for a json lines file, it’s a line object; for a query from DBMS, it’s one row; for some self-defined stream, it’s a minimal entity.

Dataset is a “table”, or more precise, a collection of Record s.

Reader is used to handle heterogeneous input. For each “row”, the return of the Reader is represented in a Python dictionary, named raw_object (shown as grey {...} in figure). Record is generated based on raw_object: user need to extend base Record class and add properties for later usage.

KeyValueAdapter is where all Record s get stored. It can be either in memory or persistent.

So, the data flow is: in order to create Dataset, use Reader to read from input source and convert entity by entity to raw_object which is used to construct Record, then store Record in KeyValueAdapter.

Obviously, generating Record is really time consuming if the dataset is large, but if the concrete KeyValueAdapter is a persistent one (e.g., HBaseKeyValueAdapter), then next time, Dataset can be loaded directly from this KeyValueAdapter instead of regenerating again from raw input.

Minimal Workflow & Implementation

Now we have two Datasets and we need to find pairs (If it’s de-duplication problem, only one Dataset is needed).

_images/overview-basic-workflow.png

Simply using RLTK’s API to get all possible combinations of candidate pairs and implement your only “magical function” (shown as is_pair() in figure) to find if two Record s are the same.

Let’s look at example input datasets and minimal implementation.

_images/overview-inputs.png
import rltk

class Record1(rltk.Record):
   @property
   def id(self):
      return self.raw_object['doc_id']

   @property
   def value(self):
      return self.raw_object['doc_value']

class Record2(rltk.Record):
   @rltk.cached_property
   def id(self):
      return self.raw_object['ident']

   @rltk.cached_property
   def value(self):
      v = self.raw_object.get('values', list())
      return v[0] if len(v) > 0 else 'empty'


ds1 = rltk.Dataset(reader=rltk.CSVReader('ds1.csv'),
                     record_class=Record1, adapter=rltk.MemoryKeyValueAdapter())
ds2 = rltk.Dataset(reader=rltk.JsonLinesReader('ds2.jl'),
                     record_class=Record2, adapter=rltk.DbmKeyValueAdapter('file_index'))

pairs = rltk.get_record_pairs(ds1, ds2)
for r1, r2 in pairs:
   print('-------------')
   print(r1.id, r1.value, '\t', r2.id, r2.value)
   print('levenshtein_distance:', rltk.levenshtein_distance(r1.value, r2.value))
   print('levenshtein_similarity:', rltk.levenshtein_similarity(r1.value, r2.value))

One thing to notice here: the property in Record class can be decorated by @property, or @rltk.cached_property which pre-calculates the value instead of computing at the runtime.

For the “magical function”, you can use any methods that make sense: hand-crafted rules, machine learning model, etc. RLTK provides a lot of similarity metrics which can be very helpful while doing comparison.

Evaluation

After designing the “magical function”, you need a way to judge its performance. RLTK has a built-in package called Evaluation which includes three basic components:

  • Groud Truth: Ground truth data.
  • Trial: Store the result of prediction of candidate pairs.
  • Evaluation: Visualize the result of evaluation if multiple trials are given.
_images/overview-evaluation-workflow.png

As can be seen from the figure, every Trial has a corresponding GroundTruth. GroundTruth needs to be provided while generating candidate pairs. Add prediction result to Trial if it needs to be evaluated later. Call evaluate() to get the evaluation of the Trial against GroundTruth.

gt = rltk.GroundTruth()
gt.load('gt.csv')
eva = rltk.Evaluation()
trial = rltk.Trial(ground_truth=gt)

test_pairs = rltk.get_record_pairs(ds1, ds2, ground_truth=gt):
for r1, r2 in test_pairs:
   is_positive = is_pair(r1, r2)
   trial.add_result(r1, r2, is_positive)

trial.evaluate()
print(trial.true_positives, trial.false_positives, trial.true_negatives, trial.false_negatives,
       trial.precision, trial.recall, trial.f_measure)

Notice add_positive() and add_negative() are just syntactic sugar of add_result() used in above code snippet.

Blocking

When finding pairs between two datasets, how many total comparison does it make?

Let’s say the 1st dataset has M items and and 2nd has N, then it needs M*N comparisons. If M=10,000, N=100,000, M*N=1,000,000,000. If the computer can determine a heavy is_pair() in 0.001s, in total it costs 1 billion x 0.001s / 60 / 60 / 24 = 11.57 days. Apparently exhausting is not a good choice. Blocking is something invented to tackle this problem. Blocking attempts to restrict comparisons to just those records for which one or more particularly discriminating identifiers agree, which has the effect of increasing the positive predictive value (precision) at the expense of sensitivity (recall).

_images/overview-blocking-tables.png

For example: Full comparison (cross product) of two tables (shown in figure) is 12 times. After inspection, it’s obvious to say that “last name” can be used as blocking key (group by based on key) since people who have different last name can’t be the same. Then, total comparison drops to 3 times.

Blocks need to be calculated and passed while generating candidate pairs. Blocks’ calculation can be time consuming so RLTK supports dumping them to disk for further usage.

_images/overview-blocking-workflow.png
def get_first_name(r2):
   return r2.full_name.split(' ')[0]

bg = rltk.HashBlockGenerator()
block = bg.generate(
               bg.block(ds1, property_='first_name'),
               bg.block(ds2, function_=get_first_name)),
pairs = rltk.get_record_pairs(ds1, ds2, block=block)
for r1, r2 in pairs:
    print(r1.id, r1.full_name, '\t', r2.id, r2.full_name)

Summary

Now you should know what’s the goal of record linkage, how to construct Dataset and how to use it in RLTK workflow, how to evaluate the quality of linkage and how to use blocking technique to deal with large datasets.

Step-by-step example

In this tutorial, you will go through an example end to end. Here are the main steps you will go through:

  • Dataset analysis
  • Construct RLTK datasets
  • Blocking
  • Pairwise comparison
  • Evaluation

Dataset analysis

The dataset used here is an artificial dataset which contructed from DBLP and Scholar data. Let’s take a look.

[1]:
# initialization
import os
from datetime import datetime
import pandas as pd
from IPython.display import display
import rltk
[2]:
df_dblp = pd.read_csv('resources/dblp.csv', parse_dates=False)
df_dblp.head()
[2]:
id names date
0 journals/sigmod/HummerLW02 W Hümmer, W Lehner, H Wedekind 2018-12-24
1 conf/vldb/AgrawalS94 R Agrawal, R Srikant 2018-12-22
2 conf/vldb/Brin95 S Brin 2018-12-26
3 conf/vldb/ChakravarthyKAK94 S Chakravarthy, V Krishnaprasad, E Anwar, S Kim 2018-12-29
4 conf/vldb/MedianoCD94 M Mediano, M Casanova, M Dreux 2018-12-26
[3]:
df_scholar = pd.read_json('resources/scholar.jl', lines=True, convert_dates=False)
df_scholar.head()
[3]:
date id names
0 26, Dec 2018 ek26aiEheesJ M Fernandez, J Kang, A Levy, D Suciu
1 29, Dec 2018 rmtEGXAXHKIJ S Adali, KS Candan, Y Papakonstantinou, VS
2 27, Dec 2018 D0z0BDnbnFcJ S Christodoulakis
3 28, Dec 2018 noTo81QxmHQJ ACMS Anthology, P Edition
4 28, Dec 2018 l0W27c1C3NwJ W Litwin, MA Neimat, DA Schneider

By a glance, it’s easy to find out that both datasets have id, date and names.

  • Dates have different formats
  • Names columns contains many names separated by comma.

Construct RLTK datasets

In RLTK, the data collection is named Dataset and each “row” is a Record instance. In order to construct a Dataset, you need to read data from source by a specific Reader, then the data is presented in a Python Dict raw_object which can be use to construct Record instance by the schema (concrete class of Record) you definded.

For DBLP:

[4]:
class DBLP(rltk.Record):
    @property
    def id(self):
        return self.raw_object['id']

    @property
    def date(self):
        return self.raw_object['date']

    @property
    def names(self):
        return list(map(lambda x: x.strip(), self.raw_object['names'].split(',')))
[5]:
ds_dblp = rltk.Dataset(rltk.CSVReader('resources/dblp.csv'), record_class=DBLP)

for r_dblp in ds_dblp.head():
    print(r_dblp.id, r_dblp.date, r_dblp.names)
journals/sigmod/HummerLW02 2018-12-24 ['W Hümmer', 'W Lehner', 'H Wedekind']
conf/vldb/AgrawalS94 2018-12-22 ['R Agrawal', 'R Srikant']
conf/vldb/Brin95 2018-12-26 ['S Brin']
conf/vldb/ChakravarthyKAK94 2018-12-29 ['S Chakravarthy', 'V Krishnaprasad', 'E Anwar', 'S Kim']
conf/vldb/MedianoCD94 2018-12-26 ['M Mediano', 'M Casanova', 'M Dreux']
conf/vldb/SistlaYH94 2018-12-25 ['A Sistla', 'C Yu', 'R Haddad']
journals/sigmod/PourabbasR00 2018-12-20 ['E Pourabbas', 'M Rafanelli']
conf/sigmod/MelnikRB03 2018-12-21 ['S Melnik', 'E Rahm', 'P Bernstein']
conf/sigmod/ZhangDWEMPMDR03 2018-12-26 ['X Zhang', 'K Dimitrova', 'L Wang', 'M El-Sayed', 'B Murphy', 'B Pielech', 'M Mulchandani', 'L Ding', 'E Rundensteiner']
conf/sigmod/ZhouWGGZWXYF03 2018-12-27 ['A Zhou', 'Q Wang', 'Z Guo', 'X Gong', 'S Zheng', 'H Wu', 'J Xiao', 'K Yue', 'W Fan']

For scholar:

[6]:
@rltk.remove_raw_object
class Scholar(rltk.Record):
    @rltk.cached_property
    def id(self):
        return self.raw_object['id']

    @rltk.cached_property
    def date(self):
        return datetime.strptime(self.raw_object['date'], '%d, %b %Y').date().strftime('%Y-%m-%d')

    @rltk.cached_property
    def names(self):
        return list(map(lambda x: x.strip(), self.raw_object['names'].split(',')))
[7]:
ds_scholar = rltk.Dataset(rltk.JsonLinesReader('resources/scholar.jl'), record_class=Scholar)

for r_scholar in ds_scholar.head():
    print(r_scholar.id, r_scholar.date, r_scholar.names)
ek26aiEheesJ 2018-12-26 ['M Fernandez', 'J Kang', 'A Levy', 'D Suciu']
rmtEGXAXHKIJ 2018-12-29 ['S Adali', 'KS Candan', 'Y Papakonstantinou', 'VS']
D0z0BDnbnFcJ 2018-12-27 ['S Christodoulakis']
noTo81QxmHQJ 2018-12-28 ['ACMS Anthology', 'P Edition']
l0W27c1C3NwJ 2018-12-28 ['W Litwin', 'MA Neimat', 'DA Schneider']
IkNOhDqEY18J 2018-12-26 ['S Acharya', 'PB Gibbons']
6QZGeKna5lgJ 2018-12-23 ['T Gri']
XFCkL9QhTjIJ 2018-12-25 ['K Koperski', 'J Han']
9Wo54Wyh_X8J 2018-12-23 ['H Garcia-Molina', 'S Raghavan']
9uxj2XzGt9UJ 2018-12-28 ['M Flickner', 'H Sawhney', 'W Niblack', 'J Ashley', 'Q']

Decorator cached_property means the property value will be pre-computed while generating the Dataset, it’s especially useful to cache the value while the transformation of property is time consuming (e.g., tokenization, vectorization). remove_raw_object is used to release the space of raw_object after all properties are being cached.

If you prefer to do data cleaning and manipulation in pandas.Dataframe, you can build Dataset from it easily.

[8]:
# do data tranformation in df_scholar first, then:

class Scholar2(rltk.AutoGeneratedRecord):
    pass

ds_scholar2 = rltk.Dataset(rltk.DataFrameReader(df_scholar), record_class=Scholar2)

for r_scholar2 in ds_scholar2.head():
    print(r_scholar2.id, r_scholar2.date, r_scholar2.names)
ek26aiEheesJ 26, Dec 2018 M Fernandez, J Kang, A Levy, D Suciu
rmtEGXAXHKIJ 29, Dec 2018 S Adali, KS Candan, Y Papakonstantinou, VS
D0z0BDnbnFcJ 27, Dec 2018 S Christodoulakis
noTo81QxmHQJ 28, Dec 2018 ACMS Anthology, P Edition
l0W27c1C3NwJ 28, Dec 2018 W Litwin, MA Neimat, DA Schneider
IkNOhDqEY18J 26, Dec 2018 S Acharya, PB Gibbons
6QZGeKna5lgJ 23, Dec 2018 T Gri
XFCkL9QhTjIJ 25, Dec 2018 K Koperski, J Han
9Wo54Wyh_X8J 23, Dec 2018 H Garcia-Molina, S Raghavan
9uxj2XzGt9UJ 28, Dec 2018 M Flickner, H Sawhney, W Niblack, J Ashley, Q

Blocking

Blocking can be used to eliminate obvious impossible pairs then greatly reduce unnecessary comparisons.

In this example, date is an ideal key for blocking.

[9]:
bg = rltk.HashBlockGenerator()
block = bg.generate(
    bg.block(ds_dblp, property_='date'),
    bg.block(ds_scholar, property_='date')
)

If you want to know what’s in a block aggregated by key, you can iterate on the key_set_adapter in block object. Block is stored in a concrete KeySetAdapter (default is MemoryKeySetAdapter).

[10]:
for idx, b in enumerate(block.key_set_adapter):
    if idx == 5: break
    print(b)
('2018-12-24', {('Scholar', 'BTalXWt3faUJ'), ('Scholar', 'bTYTn8VG5hIJ'), ('Scholar', 'sHJ914nPZtUJ'), ('DBLP', 'conf/sigmod/CherniackZ96'), ('Scholar', 'c9Humx2-EMgJ'), ('Scholar', 'YMcmy4FOXi8J'), ('Scholar', 'W1IcM8IUwAEJ'), ('DBLP', 'journals/sigmod/Yang94'), ('Scholar', 'wLNJcNvsulkJ'), ('DBLP', 'journals/sigmod/BohmR94'), ('DBLP', 'conf/sigmod/SimmenSM96'), ('DBLP', 'conf/sigmod/TatarinovIHW01'), ('DBLP', 'journals/vldb/BarbaraI95'), ('DBLP', 'conf/vldb/RohmBSS02'), ('Scholar', 'XVP8s4K0Bg4J'), ('Scholar', 'jfkafZcMjgIJ'), ('DBLP', 'conf/vldb/CosleyLP02'), ('DBLP', 'journals/sigmod/HummerLW02')})
('2018-12-22', {('DBLP', 'journals/sigmod/SilberschatzSU96'), ('Scholar', 'ckrgSn0vBOMJ'), ('Scholar', 'cIJQ0qxrkMIJ'), ('Scholar', 'ZnWLup8HMkUJ'), ('DBLP', 'journals/tods/StolboushkinT98'), ('Scholar', '-iaSLKFHwUkJ'), ('DBLP', 'journals/tods/FernandezKSMT02'), ('Scholar', 'soiN2U4tXykJ'), ('Scholar', 'x4HkJDEYFmYJ'), ('DBLP', 'journals/tods/FranklinCL97'), ('DBLP', 'conf/vldb/AgrawalS94'), ('DBLP', 'conf/sigmod/GibbonsM98')})
('2018-12-26', {('DBLP', 'conf/vldb/RothS97'), ('DBLP', 'journals/sigmod/DogacDKOONEHAKKM95'), ('DBLP', 'conf/sigmod/ZhangDWEMPMDR03'), ('Scholar', 'ek26aiEheesJ'), ('Scholar', '1hkVjoUg8hUJ'), ('Scholar', 'F2ecYx97F2sJ'), ('DBLP', 'journals/vldb/LiR99'), ('Scholar', 'rDObsYKVroMJ'), ('DBLP', 'conf/sigmod/AcharyaGPR99a'), ('Scholar', 'IkNOhDqEY18J'), ('Scholar', 'fXziEl_Htv8J'), ('Scholar', 'LxyVmHubIfUJ'), ('DBLP', 'conf/sigmod/FernandezFKLS97'), ('Scholar', 'qwjRkZuiMHsJ'), ('DBLP', 'conf/sigmod/NybergBCGL94'), ('DBLP', 'conf/sigmod/BreunigKKS01'), ('DBLP', 'conf/sigmod/LometW98'), ('DBLP', 'conf/vldb/MedianoCD94'), ('Scholar', 'Ko9e8CH2Si4J'), ('Scholar', 'DwwSuaisX5QJ'), ('Scholar', 'oAO74aolStoJ'), ('Scholar', 'jXvsW6VxbMYJ'), ('DBLP', 'conf/vldb/Brin95')})
('2018-12-29', {('Scholar', 'Ph7ZpmdNOPEJ'), ('Scholar', 'OmYc0wE1j4kJ'), ('DBLP', 'journals/sigmod/SouzaS99'), ('Scholar', 'rmtEGXAXHKIJ'), ('Scholar', '3M_0Kd8NNjgJ'), ('DBLP', 'conf/vldb/ChakravarthyKAK94'), ('DBLP', 'conf/sigmod/AdaliCPS96'), ('Scholar', 'tbZ0J3HLI18J'), ('DBLP', 'journals/sigmod/KappelR98'), ('DBLP', 'conf/vldb/MeccaCM01')})
('2018-12-25', {('Scholar', 'f1wgD54UUKwJ'), ('Scholar', 'RusJdYPDgQ4J'), ('Scholar', 'zkbTv93Zp1UJ'), ('Scholar', 'S8x6zjXc9oAJ'), ('Scholar', '0aJOXauNqYIJ'), ('Scholar', 'XFCkL9QhTjIJ'), ('DBLP', 'conf/vldb/SistlaYH94'), ('DBLP', 'conf/sigmod/HanKS97'), ('Scholar', 'xF8s5N7oUIMJ'), ('DBLP', 'journals/sigmod/FlorescuLM98'), ('Scholar', '_jl3bN2QlE4J'), ('Scholar', '0HlMHEPJRH4J'), ('DBLP', 'conf/sigmod/MamoulisP99'), ('DBLP', 'conf/sigmod/TatarinovVBSSZ02'), ('DBLP', 'conf/vldb/DeutschPT99'), ('DBLP', 'journals/tods/CliffordDIJS97'), ('DBLP', 'journals/vldb/HarrisR96'), ('DBLP', 'conf/sigmod/HuangSW94')})

Pairwise comparison

Now let’s find out real pairs in all candidate pairs.

First of all, you need to figure out how to measure two records.

[11]:
def is_pair(r1, r2):
    for n1, n2 in zip(sorted(r1.names), sorted(r2.names)):
        if rltk.levenshtein_distance(n1, n2) > min(len(n1), len(n2)) / 3:
            return False
    return True

Then, make comparison on all candidate pairs (generated within blocks).

[12]:
for r_dblp, r_scholar in rltk.get_record_pairs(ds_dblp, ds_scholar):
    if is_pair(r_dblp, r_scholar):
        print(r_dblp.names, r_scholar.names)
['W Hümmer', 'W Lehner', 'H Wedekind'] ['W Huemmer', 'W Lehner', 'H Wedekind']
['R Agrawal', 'R Srikant'] ['R Sfikant', 'R Agrawal']
['S Brin'] ['S Brin']
['S Chakravarthy', 'V Krishnaprasad', 'E Anwar', 'S Kim'] ['S Chakravarthy', 'V Krishnaprasad', 'E Anwar', 'SK Kim']
['A Sistla', 'C Yu', 'R Haddad'] ['AP Sistla', 'CT Yu', 'R Haddad']
['E Pourabbas', 'M Rafanelli'] ['E Pourabbas', 'M Rafanelli']
['S Melnik', 'E Rahm', 'P Bernstein'] ['S Melnik', 'E Rahm', 'PA Bernstein']
['S Melnik', 'E Rahm', 'P Bernstein'] ['E Rahm']
['L Libkin'] ['L Libkin']
['I Tatarinov', 'S Viglas', 'K Beyer', 'J Shanmugasundaram', 'E Shekita', 'C Zhang'] ['X Zhang']
['J Gray', 'G Graefe'] ['J Gray', 'G Graefe']
['D Florescu', 'A Levy', 'A Mendelzon'] ['F Levy']
['G Kappel', 'W Retschitzegger'] ['G Kappel', 'W Retschitzegger']
['I Tatarinov', 'Z Ives', 'A Halevy', 'D Weld'] ['I Tatarinov', 'ZG Ives', 'AY Halevy', 'DS Weld']
['A Silberschatz', 'M Stonebraker', 'J Ullman'] ['A Silberschatz', 'M Stonebraker', 'J Ullman']
['R Baeza-Yates', 'G Navarro'] ['R Baeza-Yates', 'G Navarro']
['P Buneman', 'L Raschid', 'J Ullman'] ['P Buneman', 'L Raschid', 'JD Ullman']
['K Böhm', 'T Rakow'] ['K Bohme', 'TC Rakow']
['H Darwen', 'C Date'] ['H Darwen', 'CJ Date']
['M Lee', 'M Kitsuregawa', 'B Ooi', 'K Tan', 'A Mondal'] ['ML Lee', 'M Kitsuregawa', 'BC Ooi', 'KL Tan', 'A Mondal']
['N Mamoulis', 'D Papadias'] ['N Mamoulis', 'D Papadias']
['S Acharya', 'P Gibbons', 'V Poosala', 'S Ramaswamy'] ['S Acharya', 'PB Gibbons']
['L Yang'] ['L Yang']
['G Manku', 'S Rajagopalan', 'B Lindsay'] ['GS Manku', 'S Rajagopalan', 'BG Lindsay']
['P Brown'] ['P Brown']
['D Lomet', 'G Weikum'] ['D Lomet', 'G Weikum']
['S Berchtold', 'D Keim'] ['S Berchtold', 'DA Keim']
['P Gibbons', 'Y Matias'] ['PB Gibbons', 'Y Matias']
['J Hellerstein', 'P Haas', 'H Wang'] ['JM Hellerstein', 'JP Haas', 'HJ Wang']
['J Hellerstein', 'P Haas', 'H Wang'] ['L Yang']
['B Adelberg', 'H Garcia-Molina', 'J Widom'] ['B Adelberg', 'H Garcia-Molina', 'J Widom']
['J Han', 'K Koperski', 'N Stefanovic'] ['K Koperski', 'J Han']
['D Simmen', 'E Shekita', 'T Malkemus'] ['DE Simmen', 'EJ Shekita', 'T Malkemus']
['M Fernandez', 'D Florescu', 'J Kang', 'A Levy', 'D Suciu'] ['F Levy']
['A Deutsch', 'L Popa', 'V Tannen'] ['A Deutsch', 'L Popa', 'V Tannen']
['K Mogi', 'M Kitsuregawa'] ['K Mogi', 'M Kitsuregawa']
['J Shanmugasundaram', 'K Tufte', 'C Zhang', 'G He', 'D DeWitt', 'J Naughton'] ['X Zhang']
['P Hung', 'H Yeung', 'K Karlapalem'] ['PCK Hung', 'HP Yeung', 'K Karlapalem']
['R Srikant', 'R Agrawal'] ['R Sfikant', 'R Agrawal']
['M Cherniack', 'S Zdonik'] ['M Chemiack', 'S Zdonik']
['G Gardarin', 'F Machuca', 'P Pucheral'] ['G Gardarin', 'F Machuca']
['T Griffin', 'L Libkin'] ['L Libkin']
['M Roth', 'P Schwarz'] ['PM Schwarz', 'MT Roth']
['D Srivastava', 'S Dar', 'H Jagadish', 'A Levy'] ['F Levy']
['D Srivastava', 'S Dar', 'H Jagadish', 'A Levy'] ['S Dar', 'HV Jagadish', 'AY Levy', 'D Srivastava']
['M Carey', 'D DeWitt'] ['MJ Carey', 'DJ DeWitt']
['K Sagonas', 'T Swift', 'D Warren'] ['K Sagonas', 'T Swift', 'DS Warren']
['V Raghavan'] ['V ay Raghavan']
['X Wang', 'M Cherniack'] ['X Wang', 'M Cherniack']
['M Petrovic', 'I Burcea', 'H Jacobsen'] ['M Petrovic', 'I Burcea', 'HA Jacobsen']
['S Raghavan', 'H Garcia-Molina'] ['H Garcia-Molina', 'S Raghavan']
['D Cosley', 'S Lawrence', 'D Pennock'] ['D Cosley', 'S Lawrence', 'DM Pennock']
['K Goldman', 'N Lynch'] ['KJ Goldman', 'N Lynch']
['S Guo', 'W Sun', 'M Weiss'] ['S Guo', 'W Sun', 'MA Weiss']
['W Litwin', 'M Neimat', 'D Schneider'] ['W Litwin', 'MA Neimat', 'DA Schneider']
['A Stolboushkin', 'M Taitslin'] ['AP Stolboushkin', 'MA Taitslin']
['V Verykios', 'G Moustakides', 'M Elfeky'] ['VS Verykios', 'GV Moustakides', 'MG Elfeky']
['C Lee', 'C Shih', 'Y Chen'] ['C Lee', 'CS Shih', 'YH Chen']
['E Rahm', 'P Bernstein'] ['S Melnik', 'E Rahm', 'PA Bernstein']
['E Rahm', 'P Bernstein'] ['E Rahm']
['S Sarawagi'] ['S Sarawagi']
['E Harris', 'K Ramamohanarao'] ['EP Harris', 'K Ramamohanarao']
['D Barbará', 'T Imielinski'] ['D Barbara', 'T Imielinski']
['A Dan', 'P Yu', 'J Chung'] ['A Dan', 'PS Yu', 'JY Chung']
['B Hammond'] ['B Hammond']

Evaluation

How do I know the performance of the strategy that I use? Evaluation is a built-in module for benchmarking.

The first step is to label data to get ground truth.

[13]:
gt = rltk.GroundTruth()
with open('resources/dblp_scholar_gt.csv') as f:
    for d in rltk.CSVReader(f): # this can be replace to python csv reader
        gt.add_positive(d['idDBLP'], d['idScholar'])
gt.generate_all_negatives(ds_dblp, ds_scholar, range_in_gt=True)

Trial is used to records all the result for further evaluation. It needs to have an associated GroundTruth.

[14]:
trial = rltk.Trial(gt)
for r_dblp, r_scholar in rltk.get_record_pairs(ds_dblp, ds_scholar):
    if is_pair(r_dblp, r_scholar):
        trial.add_positive(r_dblp, r_scholar)
    else:
        trial.add_negative(r_dblp, r_scholar)
trial.evaluate()
print('precison:', trial.precision, 'recall:', trial.recall, 'f-measure:', trial.f_measure)
print('tp:', len(trial.true_positives_list))
print('fp:', len(trial.false_positives_list))
print('tn:', len(trial.true_negatives_list))
print('fn:', len(trial.false_negatives_list))
precison: 0.8615384615384616 recall: 0.5894736842105263 f-measure: 0.7
tp: 56
fp: 9
tn: 8824
fn: 39

Real world example

In this example, I’m going to show you how to use RLTK to solve a real problem.

Problem & Dataset analysis

The data we used here is called Abt-Buy, which can be found here. Abt.com (Abt.csv) and Buy.com (Buy.csv) are two e-commerce retailers, the goal is to find all matches (abt_buy_perfectMapping.csv) of products between these two files.

Let’s take a look of these files first. wc, less and grep are great tools to start with, then pandas or other data analysis tools / libraries can tell you more detailed information. Here’s what I will do:

[1]:
# initialization
import os
import pandas as pd
from IPython.display import display

# find rltk-experimentation
def find_file_path(from_dir, file_path, depth=5):
    if depth == 0:
        raise RecursionError('Maximum recursion depth exceeded')
    path = os.path.join(from_dir, file_path)
    if os.path.exists(path):
        return path
    return find_file_path(os.path.join(from_dir, '..'), file_path, depth-1)
os.chdir(find_file_path(os.getcwd(), 'rltk-experimentation'))
[2]:
def print_stats(fp):
    print(fp)
    df_data = pd.read_csv(fp, encoding='latin-1')

    print('\nfirst 5 rows:')
    display(df_data.head(5))

    stat = []
    for i in range(df_data.shape[1]):
        stat.append(df_data.shape[0] - df_data.iloc[:,i].isnull().sum())
    df_stat = pd.DataFrame([stat], columns=df_data.columns.values.tolist())
    df_stat.rename(index={0: 'total'}, inplace=True)
    print('\ntotal number of rows:')
    display(df_stat.head(1))
    print('\n')

print_stats('datasets/Abt-Buy/abt.csv')
print_stats('datasets/Abt-Buy/buy.csv')
print_stats('datasets/Abt-Buy/abt_buy_perfectMapping.csv')
datasets/Abt-Buy/abt.csv

first 5 rows:
id name description price
0 552 Sony Turntable - PSLX350H Sony Turntable - PSLX350H/ Belt Drive System/ ... NaN
1 580 Bose Acoustimass 5 Series III Speaker System -... Bose Acoustimass 5 Series III Speaker System -... $399.00
2 4696 Sony Switcher - SBV40S Sony Switcher - SBV40S/ Eliminates Disconnecti... $49.00
3 5644 Sony 5 Disc CD Player - CDPCE375 Sony 5 Disc CD Player- CDPCE375/ 5 Disc Change... NaN
4 6284 Bose 27028 161 Bookshelf Pair Speakers In Whit... Bose 161 Bookshelf Speakers In White - 161WH/ ... $158.00

total number of rows:
id name description price
total 1081 1081 1081 418


datasets/Abt-Buy/buy.csv

first 5 rows:
id name description manufacturer price
0 10011646 Linksys EtherFast EZXS88W Ethernet Switch - EZ... Linksys EtherFast 8-Port 10/100 Switch (New/Wo... LINKSYS NaN
1 10140760 Linksys EtherFast EZXS55W Ethernet Switch 5 x 10/100Base-TX LAN LINKSYS NaN
2 10221960 Netgear ProSafe FS105 Ethernet Switch - FS105NA NETGEAR FS105 Prosafe 5 Port 10/100 Desktop Sw... Netgear NaN
3 10246269 Belkin Pro Series High Integrity VGA/SVGA Moni... 1 x HD-15 - 1 x HD-15 - 10ft - Beige Belkin NaN
4 10315184 Netgear ProSafe JFS516 Ethernet Switch Netgear ProSafe 16 Port 10/100 Rackmount Switc... Netgear NaN

total number of rows:
id name description manufacturer price
total 1092 1092 651 1086 590


datasets/Abt-Buy/abt_buy_perfectMapping.csv

first 5 rows:
idAbt idBuy
0 38477 10011646
1 38475 10140760
2 33053 10221960
3 27248 10246269
4 25262 10315184

total number of rows:
idAbt idBuy
total 1097 1097


After a rough inspection, the summaries are:

  • Abt
    • It has 1081 items and all items have id, name and description, only 414 items have price.
    • It seems name is formed in the pattern {product name} - {model}
  • Buy
    • It has 1092 items and all items have id and name, 1086 items have manufacturer, some items have description and prices.
    • Some of the names are formed in pattern {product name} - {model}, somes are {product name} - {probably sku id}
  • Most of the name have brand / manufacturer included.
  • There are 1097 matches in total.

Construct RLTK components

One thing you should notice here is that my Record is not built immediately. I usually do a very basic one first, then evaluate the linkage result to find what should be improved. It’s like a feedback system, after serveral rounds improvement, you should get a better Record.

My personal assumption is, brand (manufacturer) and model can be two strong indicators: if two records have same brand and same model, there’s a very high possibility that they belong to same entity.

So I write couple of functions to do tokenization, model & brand extraction, name alias parsing.

[3]:
import rltk

tokenizer = rltk.CrfTokenizer()


model_stop_words = set([])
with open('Abt-Buy/rltk_exp/stop_words_model.txt') as f:
    for line in f:
        line = line.strip().lower()
        if line:
            model_stop_words.add(line)

def extract_possible_model(s):
    possible_models = []
    tokens = s.split(' ')

    for t in tokens:
        t = t.replace('(', '').replace(')', '')
        if len(t) < 2 or t in model_stop_words:
            continue

        if t.isdigit():
            possible_models.append(t)
            continue

        has_digit = has_alpha = False
        for c in t:
            if c.isdigit():
                has_digit = True
            elif c.isalpha():
                has_alpha = True
            if has_digit and has_alpha:
                possible_models.append(t)

    possible_models.sort(key=len, reverse=True)

    return possible_models[0] if len(possible_models) > 0 else ''


def tokenize(s):
    tokens = tokenizer.tokenize(s)
    return [w.lower() for w in tokens if w.isalpha()]


def get_brand_name(tokens):
    for word_len in range(min(5, len(tokens)), 0, -1):
        i = 0; j = i + word_len
        while j <= len(tokens):
            name = ' '.join(tokens[i:j])
            if name in brand_list:
                return name
            i += 1; j += 1
    return ''


def process_brand_alias(alias):
    return brand_mapping.get(alias, alias)


brand_list = set([])
with open('Abt-Buy/rltk_exp/brands.txt') as f:
    for line in f:
        line = line.strip().lower()
        if len(line) == 0:
            continue
        brand_list.add(' '.join(tokenize(line)))

brand_mapping = {}
with open('Abt-Buy/rltk_exp/brand_alias.txt') as f:
    for line in f:
        alias = [w.strip().lower() for w in line.split('|')]
        for name in alias:
            brand_mapping[name] = alias[0]

Then, I build Records and Datasets.

[4]:
@rltk.remove_raw_object
class AbtRecord(rltk.Record):
    def __init__(self, raw_object):
        super().__init__(raw_object)
        self.brand = ''

    @rltk.cached_property
    def id(self):
        return self.raw_object['id']

    @rltk.cached_property
    def name(self):
        return self.raw_object['name'].split(' - ')[0]

    @rltk.cached_property
    def name_tokens(self):
        tokens = tokenize(self.name)

        self.brand = get_brand_name(tokens)

        return set(tokens)

    @rltk.cached_property
    def model(self):
        ss = self.raw_object['name'].split(' - ')
        return ss[-1].strip() if len(ss) > 1 else ''

    @rltk.cached_property
    def description(self):
        return self.raw_object.get('description', '')

    @rltk.cached_property
    def price(self):
        p = self.raw_object.get('price', '')
        if p.startswith('$'):
            p = p[1:].replace(',', '')
        return p

    @rltk.cached_property
    def brand_cleaned(self):
        _ = self.name_tokens
        return process_brand_alias(self.brand)

    @rltk.cached_property
    def model_cleaned(self):
        m = self.model
        return m.lower().replace('-', '').replace('/', '').replace('&', '')
[5]:
@rltk.remove_raw_object
class BuyRecord(rltk.Record):
    def __init__(self, raw_object):
        super().__init__(raw_object)
        self.brand = ''

    @rltk.cached_property
    def id(self):
        return self.raw_object['id']

    @rltk.cached_property
    def name(self):
        return self.raw_object['name'].split(' - ')[0]

    @rltk.cached_property
    def name_tokens(self):
        tokens = tokenize(self.name)
        self.brand = get_brand_name(tokens)
        return set(tokens)

    @rltk.cached_property
    def description(self):
        return self.raw_object.get('description', '')

    @rltk.cached_property
    def manufacturer(self):
        return self.raw_object.get('manufacturer', '').lower()

    @rltk.cached_property
    def price(self):
        p = self.raw_object.get('price', '')
        if p.startswith('$'):
            p = p[1:].replace(',', '')
        return p

    @rltk.cached_property
    def model(self):
        ss = self.raw_object['name'].split(' - ')
        ss = ss[0].strip()

        return extract_possible_model(ss)

    @rltk.cached_property
    def name_suffix(self): # could probably be the model
        ss = self.raw_object['name'].split(' - ')
        return BuyRecord._clean(ss[-1]) if len(ss) > 1 else ''

    @staticmethod
    def _clean(s):
        return s.lower().replace('-', '').replace('/', '').replace('&', '')

    @rltk.cached_property
    def brand_cleaned(self):
        _ = self.name_tokens
        manufacturer = self.manufacturer
        return process_brand_alias(manufacturer if manufacturer != '' else self.brand)

    @rltk.cached_property
    def model_cleaned(self):
        m = self.model
        return BuyRecord._clean(m)
[6]:
ds_abt = rltk.Dataset(reader=rltk.CSVReader(open('datasets/Abt-Buy/Abt.csv', encoding='latin-1')),
                   record_class=AbtRecord, adapter=rltk.MemoryKeyValueAdapter())

ds_buy = rltk.Dataset(reader=rltk.CSVReader(open('datasets/Abt-Buy/Buy.csv', encoding='latin-1')),
                   record_class=BuyRecord, adapter=rltk.MemoryKeyValueAdapter())

Notes:

  • cached_property is set for pre-computing. It’s recommended to use if the property generating is time consuming.
  • Because cached_property is set and no more property needs raw_object, remove_raw_object is set to release the space used by raw_object.
  • If you are using persistent Adapter (Redis, HBase) in Dataset, you can reuse it by calling rltk.Dataset(adapter=...) without other parameters.

Blocking

Blocking can reduce a lot of unnecessary computings (but it also imports false postives and false negatives, which can be evaluated by pair completness and reduction ratio). Here I use a simple trigram blocking key which is really practically and widely-used in real world.

[7]:
def simple_ngram(s, n=3):
    return [s[i:i + n] for i in range(len(s) - (n - 1))]

bg = rltk.TokenBlockGenerator()
block = bg.generate(
    bg.block(ds_abt, function_=lambda r: simple_ngram(r.name, 3)),
    bg.block(ds_buy, function_=lambda r: simple_ngram(r.name, 3))
)

Rule based solution

One traditional way of solving record linkage problem is using some rules.

Build ground truth

Since abt_buy_perfectMapping.csv contains all positives, the combinations of two records should be negative. There are lot of ways to generate negatives and RLTK also provides many methods.

My plan here is to use fall perfect matches as positive and generate all negatives based the cross product of all ids appear in these matches.

[8]:
gt = rltk.GroundTruth()
with open('datasets/Abt-Buy/abt_buy_perfectMapping.csv', encoding='latin-1') as f:
    for d in rltk.CSVReader(f): # this can be replace to python csv reader
        gt.add_positive(d['idAbt'], d['idBuy'])
gt.generate_all_negatives(ds_abt, ds_buy, range_in_gt=True)

Generate results

Let’s come up with some rules and generate results.

[9]:
def rule_based_method(r_abt, r_buy):
    brand_score = 0
    if r_abt.brand_cleaned and r_buy.brand_cleaned:
        if r_abt.brand_cleaned == r_buy.brand_cleaned:
            brand_score = 1
    model_score = 0
    if r_abt.model_cleaned and r_buy.model_cleaned:
        if r_abt.model_cleaned == r_buy.model_cleaned:
            model_score = 1
    jaccard_score = rltk.jaccard_index_similarity(r_abt.name_tokens, r_buy.name_tokens)

    if model_score == 1:
        return True, 1

    total = brand_score * 0.3 + model_score * 0.3 + jaccard_score * 0.4
    return total > 0.45, total

Trial can be used to record and evaluate results.

[10]:
trial = rltk.Trial(gt)
candidate_pairs = rltk.get_record_pairs(ds_abt, ds_buy, ground_truth=gt, block=block)
for r_abt, r_buy in candidate_pairs:
    result, confidence = rule_based_method(r_abt, r_buy)
    trial.add_result(r_abt, r_buy, result, confidence)

Evaluation

[11]:
trial.evaluate()
print(trial.true_positives, trial.false_positives, trial.true_negatives, trial.false_negatives,
      trial.precision, trial.recall, trial.f_measure)
print('tp:', len(trial.true_positives_list))
print('fp:', len(trial.false_positives_list))
print('tn:', len(trial.true_negatives_list))
print('fn:', len(trial.false_negatives_list))
0.7620190210278335 0.02891807612686452 0.9710819238731355 0.23798097897216647 0.26020826195122676 0.7620190210278335 0.387944341414119
tp: 17467
fp: 49660
tn: 1667605
fn: 5455

Instead of setting a pre-computed is_positive mark, theshold can be decided at evaluation time. What’s more, if you have a collection of Trials, you can use rltk.Evaluation to plot a chart.

[12]:
eva = rltk.Evaluation()
for threshold in range(0, 10):
    threshold = float(threshold) / 10
    t = trial.clone() # remember to clone it
    t.evaluate(threshold)
    eva.add_trial(t)
eva.plot_precision_recall().show()
_images/real_world_example_22_0.png

Machine learning solution

Another approach is using machine learning techniques. Scikit-learn is used here (Run pip install -U scikit-learn for installation).

Feature vector

The basic idea to use machine learning is to construct the feature vector of each pair and use it to train a model, then this model can be used to predict whether the input feature vector indicates a pair or not.

[13]:
tfidf = rltk.TF_IDF()
for r in ds_abt:
    tfidf.add_document(r.id, r.name_tokens)
for r in ds_buy:
    tfidf.add_document(r.id, r.name_tokens)
tfidf.pre_compute()

def generate_feature_vector(r_abt, r_buy):
    # brand
    brand_score = 0.2
    brand_marker = 0
    if r_abt.brand_cleaned and r_buy.brand_cleaned:
        if r_abt.brand_cleaned == r_buy.brand_cleaned:
            brand_score = 1
            brand_marker = 1


    # model 1
    model_score = 0.2
    model_marker = 0
    if r_abt.model_cleaned and r_buy.model_cleaned:
        if r_abt.model_cleaned == r_buy.model_cleaned:
            model_score = 1
            model_marker = 1
        else:
            if len(r_abt.model_cleaned) > len(r_buy.model_cleaned):
                if r_abt.model_cleaned.startswith(r_buy.model_cleaned) \
                        or r_abt.model_cleaned.endswith(r_buy.model_cleaned):
                    model_score = 1
                    model_marker = 1
                else:
                    model_score = rltk.levenshtein_similarity(r_abt.model_cleaned, r_buy.model_cleaned)
            elif len(r_abt.model_cleaned) < len(r_buy.model_cleaned):
                if r_buy.model_cleaned.startswith(r_abt.model_cleaned) \
                        or r_buy.model_cleaned.endswith(r_abt.model_cleaned):
                    model_score = 1
                    model_marker = 1
                else:
                    model_score = rltk.levenshtein_similarity(r_abt.model_cleaned, r_buy.model_cleaned)
            else:
                model_score = 0

    # model 2
    model2_score = rltk.levenshtein_similarity(r_abt.model_cleaned, r_buy.name_suffix)

    # name tokens jaccard
    jaccard_score = rltk.jaccard_index_similarity(r_abt.name_tokens, r_buy.name_tokens)

    tfidf_score = tfidf.similarity(r_abt.id, r_buy.id)

    # price
    if r_abt.price and r_buy.price:
        price_marker = 1
        abt_price = float(r_abt.price)
        buy_price = float(r_buy.price)
        if abt_price == 0 and buy_price == 0:
            price_difference = 0
        else:
            price_difference = float(abs(abt_price - buy_price)) / max(abt_price, buy_price)
    else:
        price_marker = 0
        price_difference = 0

    return [brand_score, brand_marker, model_score, model_marker,
            model2_score, jaccard_score, tfidf_score, price_difference, price_marker]

Train test split

[14]:
gt.remove_negatives()
gt_train, gt_test = gt.train_test_split(test_ratio=0.3)

Generate stratified negatives for ground truth

In order to train a machine learning model, same amount of negatives to positives needs to be given. But how to sample negatives is a problem: random sampling may only give training algorithm very easy-to-detect negatives.

So I’m going to do a stratified sampling. RLTK has a built-in method called generate_stratified_negatives. You only need to provide a clustering function and tell RLTK the total number of clusters and the total number of negatives you want, then it generates negatives and picks them based on the positive and negatives ratio of each cluster.

For testing, I want the model to be validated on all possible combination of pairs.

[15]:
from sklearn.cluster import KMeans

X_km = []
for id_abt, id_buy, _ in gt_train:
    r_abt = ds_abt.get_record(id_abt)
    r_buy = ds_buy.get_record(id_buy)
    X_km.append(generate_feature_vector(r_abt, r_buy))
kmeans_model = KMeans(n_clusters=10, random_state=0).fit(X_km)


def classify(r_abt, r_buy):
    v = generate_feature_vector(r_abt, r_buy)
    cluster_id = kmeans_model.predict([v])[0]
    return cluster_id

gt_train.generate_stratified_negatives(ds_abt, ds_buy, classify, 10, range_in_gt=True, exclude_from=gt_test)
gt_test.generate_all_negatives(ds_abt, ds_buy, range_in_gt=True, exclude_from=gt_train)

Train and test

After preparation, it’s time to train and test model.

[16]:
from sklearn import svm
from sklearn.model_selection import GridSearchCV

# train
X, y = [], []
train_pairs = rltk.get_record_pairs(ds_abt, ds_buy, ground_truth=gt_train)
for r_abt, r_buy in train_pairs:
    v = generate_feature_vector(r_abt, r_buy)
    X.append(v)
    y.append(gt_train.get_label(r_abt.id, r_buy.id))

clf = svm.SVC(probability=True)
res = 5
clf = GridSearchCV(clf,
                   {'C' : [i / res for i in range(1, res + 1)],
                    'gamma' : [i / (100 * res) for i in range(0, res + 1)]},
                  cv=3)
clf.fit(X, y)

# test
trial = rltk.Trial(ground_truth=gt_test)
for r_abt, r_buy in rltk.get_record_pairs(ds_abt, ds_buy, ground_truth=gt_test):
    # ml
    v = generate_feature_vector(r_abt, r_buy)
    vv = clf.predict_proba([v])[0][1]
    trial.add_result(r_abt, r_buy, vv > 0.3,
                     confidence=vv,
                     feature_vector=v)

Though Abt-Buy contains few many-to-many pairs, if I restrict it only to have one-to-one pairs (by using Munkres), false positive drops and F-measure increases.

[17]:
for threshold in [round(x * 0.1, 1) for x in range(0, 10)]:
    trial.run_munkres(threshold=threshold)
    trial.evaluate()
    print('threshold:', threshold, 'f-measure:', trial.f_measure)
threshold: 0.0 f-measure: 0.8462709284627092
threshold: 0.1 f-measure: 0.8462709284627092
threshold: 0.2 f-measure: 0.8416149068322981
threshold: 0.3 f-measure: 0.8538587848932677
threshold: 0.4 f-measure: 0.75
threshold: 0.5 f-measure: 0.7252336448598131
threshold: 0.6 f-measure: 0.6666666666666666
threshold: 0.7 f-measure: 0.628099173553719
threshold: 0.8 f-measure: 0.5482456140350878
threshold: 0.9 f-measure: 0.14124293785310732

Scaling and Optimization

One important feature of RLTK is scalability. It can either work with very limited resources or utilize large amount of resources.

Set proper arguments

Some of the methods have optional / required arguments about buffer size, chunk size, queue size, etc. Giving them proper values according to your machine’s specification can reduce a lot of unnecessary memory-disk swap operations.

Parallel processing

Here you need to use a package called pyrallel.

General parallel processing

If you have some compute-intensive procedures and your machine has more than one CPU core, pyrallel.ParallelProcessor is a tool to try. You can find more detailed information in its API documentation, but in general, it encapsulates multiprocessing to do parallel computing and multithreading to do data collecting.

result = []

def heavy_calculation(x, y):
    return x * x, y + 5

def output_handler(r1, r2):
    result.append(r1 if r1 > r2 else r2)

pp = pyrallel.ParallelProcessor(8, mapper=heavy_calculation, collector=output_handler)
pp.start()

for i in range(8):
    pp.add_task(i, i + 1)

pp.task_done()
pp.join()

print(result)

MapReduce

The above solution uses one thread (in main process) for collecting calculated data. If you want to do something like divide and conquer, especially when “conquer” needs heavy calculation, you may need pyrallel.MapReduce module.

def mapper(x):
    time.sleep(0.0001)
    return x

def reducer(r1, r2):
    return r1 + r2

mr = pyrallel.MapReduce(8, mapper, reducer)
for i in range(10000):
    mr.add_task(i)

mr.task_done()
result = mr.join()
print(result)

Distributed computing (Experimental)

Note

It’s not true that running RLTK on one machine is slower than on cluster, performance depends on requirement, data and code. If you only have tiny datasets and light task, Parallel computing is also not needed, creating processes and thread context switching all have costs. Similarly, distributed computing has more cost on IO (especially network) and it’s more hard to do debugging, use it when you really need it. For most of the time, refactor code may have a boosting effect.

If you have an extremely heavy computation work or very large datasets, and you also have multiple idle machines, you may consider to use distributed computing. More detailed usage is in API documentation Remote.

First you need to set up a cluster. Cluster is formed by one scheduler and a bunch of workers.

To start a scheduler, do

python -m rltk remote.scheduler

Then on worker machines, do

python -m rltk remote.worker <scheduler ip>:8786 --nprocs <processors>

Second, change a bit of your code and run it. The API for distributed computing is really like pyrallel.ParallelProcessor. But you need a rltk.remote.Remote object which connects to the scheduler and an instance of rltk.remote.Task which has a input and a output handler.

def input_handler(r1, r2):
    return r1, r2, is_pair(r1, r2)

def output_handler(r1, r2, label):
    print(r1.id, r2.id, label)

remote = rltk.remote.Remote('127.0.0.1:8786')
task = rltk.remote.Task(remote, input_handler=input_handler, output_handler=output_handler)
task.start()

for r1, r2 in rltk.get_record_pairs(ds1, ds2):
    task.compute(r1, r2)

task.task_done()
task.join()

If data is in shared data store (file systems or services), there’s no need to transfer record data through scheduler to worker but record id. Then workers can get data directly from data store. So change your code to make input_handler accepts id as input and fetch the record data in it.

def input_handler(id1, id2):
    r1, r2 = ds1.get(id1), ds2.get(id2)
    return is_pair(r1, r2)

task = rltk.remote.Task(remote, input_handler=input_handler, output_handler=output_handler)
task.start()

for r1, r2 in rltk.get_record_pairs(ds1, ds2):
    task.compute(r1.id, r2.id)

task.task_done()
task.join()

API Reference

API Reference

Dataset

Record

Similarity

Normal metrics

Hybrid metrics

Phonetic metrics

Evaluation

GroundTruth

Trial

Evaluation

Blocking

Block

Block Black List

Block Generator

Blocking Helper

IO (Input & Output)

Reader

Generic Reader
GroundTruth Reader

Writer

Generic Writer
GroundTruth Writer

Adapter

Key Value Adapter
Key Set Adapter

Serializer

Utilities

Utilities

Command line interface (CLI)

Remote

RLTK’s remote module is based on Dask’s distributed. It has scheduler which coordinates the actions of several worker s spread across multiple machines and the concurrent requests of several clients.

To start scheduler, do:

python -m rltk remote.scheduler --port <port>

Then on worker machines, do

python -m rltk remote.worker <scheduler-ip>:<scheduler-port> --nprocs <processors>

Authentication is supported through Privacy Enhanced Mail (PEM) files. You can either get them from CA (Certificate Authority) or generate self-signed PEM locally. Here’s an example of generating PEM by using OpenSSL:

openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem

Then provide these PEM files while starting scheduler and workers. If you don’t have CA certificate, set tls-ca-file same to tls-cert.

# scheduler
python -m rltk remote.scheduler --port <port> --tls-ca-file cert.pem --tls-cert cert.pem --tls-key key.pem

# worker, specify protocol TLS in scheduler's address
python -m rltk remote.worker tls://<scheduler-ip>:<scheduler-port> --tls-ca-file cert.pem --tls-cert cert.pem --tls-key key.pem

Dask provides a web UI to monitor scheduler and worker status, detailed usage can be found here.

Remote

Task

Tokenizer