-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcategorize_articles.py
executable file
·165 lines (136 loc) · 4.68 KB
/
categorize_articles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import os
from pprint import pprint
import re
import tarfile
import sys
import datetime
import time
import subprocess
from model.model import TxtClassificationModel
from multiprocessing import Pool,Manager
import multiprocessing
from multiprocessing.pool import ThreadPool
from shutil import copyfile
class CategorizeArticles(object):
"""
Script used to read articles from the filesystem
and categorize them using the generated model
hacked together right now
"""
def __init__(self,
match='\d+-\d+-\d+-\d+-\d+-\d+',
start=datetime.datetime(2009,1,1),
end=datetime.datetime(2014,12,31)):
#TODO accept path, startdate, enddate as arguments
self.start_date = start
self.end_date = end
self.date_match = re.compile(match)
def run(self,path,cpu_count,match,start,end):
if not cpu_count:
cpu_count = multiprocessing.cpu_count()
n_procs = cpu_count -1 if cpu_count > 1 else 1
directories = os.listdir(path)
mgm = Manager()
job_queue = mgm.Queue()
result_queue = mgm.Queue()
# Add work items to job queue
for directory in directories:
job_queue.put(directory)
# - Pool
pool = []
for i in xrange(n_procs):
p = multiprocessing.Process(
target=categorize_articles, args=((job_queue, self.date_match,self.start_date,self.end_date,path,result_queue),),)
p.start()
pool.append(p)
for p in pool:
p.join()
print "Text Classification complete..."
prediction_results = []
while not result_queue.empty():
#Evaluate performance of extend vs flatten
prediction_results.extend(result_queue.get(block=False))
print "Text Classification complete...1"
return prediction_results
def categorize_articles(arg_list):
# - Unpack variables
job_queue = arg_list[0]
date_match = arg_list[1]
start_date = arg_list[2]
end_date = arg_list[3]
path = arg_list[4]
result_queue = arg_list[5]
txt_classifier = TxtClassificationModel()
while not job_queue.empty():
directory = job_queue.get(block=False)
if date_match.match(directory):
date = datetime.datetime.strptime(directory, '%Y-%m-%d-%H-%M-%S')
if date >= start_date and date <= end_date:
pprint(directory)
# - Path to location of extracted files
articles_location = path + '/' + directory
files = []
if os.path.exists(articles_location):
#TODO filter for only .txt here
files_tmp = os.listdir(articles_location)
for filet in files_tmp:
if 'txt' in filet or 'html' in filet:
files.append(filet)
else:
raise Exception('File location not valid.',articles_location)
# - Fully qualified file name
articles_list = []
for file_name in files:
articles_list.append(articles_location + '/' + file_name)
files = None
# - Get predictions
preds = txt_classifier.predict_labels(articles_list,directory)
result_queue.put(preds)
def move_files_newsblaster(path):
'''
One off script
'''
directories = os.listdir(directory_location)
pool = Pool(20)
directories = os.listdir(path)
for directory in directories:
pool.apply_async(move_file_p, args=((directory,path),))
#pool.apply_async(categorize_articles, args=((directory,date_match,start_date,end_date,path,prediction_results),),callback=save_results)
pool.close()
pool.join()
def move_file_p(args):
directory = args[0]
path = args[1]
print directory
files_location_2 = path + '/' + directory + "/proj/nlp/users/blaster/newsblaster/data/ArticleExtractor/clean/"
files_location_3 = path + '/' + directory + "/proj/nlpdisk3/nlpusers/blaster/newsblaster/data/ArticleExtractor/clean/"
files = []
files_location = ''
if os.path.exists(files_location_2):
files = os.listdir(files_location_2)
files_location = files_location_2
elif os.path.exists(files_location_3):
files = os.listdir(files_location_3)
files_location = files_location_3
print files_location
mv_path = path + '/' + directory + '/'
for file_name in files:
src = files_location +'/' +file_name
dst = mv_path + file_name
os.symlink(src, dst)
#copyfile(src, dst)
# Delete old results
cat_path = mv_path + 'cat_results_path.txt'
file_path = mv_path + 'file_list.txt'
if os.path.exists(cat_path):
os.remove(cat_path)
if os.path.exists(file_path):
os.remove(file_path)
if __name__ == '__main__':
categorizer = CategorizeArticles()
#manager = CatManager()
results = categorizer.run('/home/dvc2106/newsblaster_project/binaryNLP/data')
#results = categorizer.run('/home/dvc2106/newsblaster_project/binarynlp/data',categorize_articles,save_results)
pprint(results)
# directory_location = '/home/dvc2106/newsblaster_project/nb_migration/stream'
# move_files_newsblaster(directory_location)