-
Notifications
You must be signed in to change notification settings - Fork 1
/
ucl_mlp_keras_init_weights.py
180 lines (154 loc) · 4.96 KB
/
ucl_mlp_keras_init_weights.py
1
from keras.models import Sequentialfrom keras.layers.core import Dense, Dropout, Activationfrom keras.optimizers import SGDfrom keras.datasets import cifar10 import kerasimport numpy as npimport matplotlib.pyplot as pltimport theanoimport theano.tensor as Timport linecacheimport mathfrom sklearn.metrics import roc_auc_scorefrom sklearn.metrics import mean_squared_errorrng = np.random#parameterstrain_size=312437 #training sizetest_size=156063 #test sizetrain_file='./train.dl.bin.notag.txt' #training filetest_file='./test.dl.bin.notag.txt' #test filen_epoch=1 #number of epochsbatch_size_train=30 #batch size of train setfeats=65 #features sizen_layer_one=361 #the number of bottom layer#n_feats_before_fm=16 #number of original features#get all test setdef getAllTextData(): array=[] arrayY=[] for i in range(test_size): line=linecache.getline(test_file, i+1) if line.strip()!="": y=line[0:line.index(',')] x=line[line.index(',')+1:] arr=[float(xx) for xx in x.split(',')] array.append(arr) arrayY.append(int(y)) xarray=np.array(array, dtype=theano.config.floatX) yarray=np.array(arrayY, dtype=np.int32) return [xarray,yarray]#get all train setdef getAllTrainData(): array=[] arrayY=[] for i in range(train_size): line=linecache.getline(train_file, i+1) if line.strip()!="": y=line[0:line.index(',')] x=line[line.index(',')+1:] arr=[float(xx) for xx in x.split(',')] array.append(arr) arrayY.append(int(y)) xarray=np.array(array, dtype=theano.config.floatX) yarray=np.array(arrayY, dtype=np.int32) return [xarray,yarray] #init train/test dataX_train,y_train=getAllTrainData()y_train=y_train.tolist()X_test,y_test=getAllTextData()y_test=y_test.tolist()print 'test set shape:',X_test.shape[0]#init weight # W_values = [np.array(rng.uniform(# low=-np.sqrt(6. / (feats + n_layer_one)),# high=np.sqrt(6. / (feats + n_layer_one)),# size=(feats,n_layer_one)# ), dtype=theano.config.floatX),np.zeros(n_layer_one)]# def get_numj(num): k=1 sum=0 for i in range(1,n_layer_one): if i==num: return k elif (i==(k*3+sum)): sum=sum+k*3 k=k+1def get_j(k): sum=0 for i in range(1,k+1): sum=sum+i*3 return sumdef get_weight(i,j): if (j==1): numj=1 else: numj=get_numj(j) #the max num should be the number of original features numi=(i-1)/3+1 if numi<=numj: if (((numi*3-i)==((get_j(numj)-(int((get_j(numj)-j)/3)*3))-j)) and (numi==(numj-int((get_j(numj)-j)/3)))): return 1 else: return 0 elif numi==(numj+1): if (int(get_j(numj))-j)==(3*numi-i): return 1 else: return 0 else: return 0 ww=np.zeros((feats,n_layer_one))# for i in range(feats):# for j in range(n_layer_one):# if ((i==0 or (i%4)==1) and j==0):# ww[i,j]=1# elif(j>0 and i>1) and ((i-1)%4!=0):# ww[i,j]=get_weight(i-((i-1)/4)-1,j) #indexes have been changed from 1 and without index w# # check the weights for testing# for i in range(feats):# for j in range(n_layer_one):# if ww[i,j]==1:# print 'i: '+str(i)+' j:'+str(j)+' :'+str(ww[i,j])# for i in range(feats): for j in range(n_layer_one): if ((i==0 or (i%4)==1) and j==0): ww[i,j]=1W_values=[np.asarray(ww),np.zeros(n_layer_one)]ww2=[np.ones((n_layer_one,1)),np.zeros(1)]#define Multi-layer Perceptron model = Sequential()model.add(Dense(feats, n_layer_one,weights=W_values))model.add(Activation('linear'))#model.add(Dropout(0))model.add(Dense(n_layer_one, 1,weights=ww2))model.add(Activation('sigmoid'))sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)model.compile(loss='binary_crossentropy', optimizer=sgd)#init erroryp = model.predict_proba(X_test, batch_size=10)auc = roc_auc_score(y_test, yp)rmse = math.sqrt(mean_squared_error(y_test, yp))print 'Init Test Err: ' + str(auc) + '\t' + str(rmse)#print errordef get_evaluation(): yp = model.predict_proba(X_test, batch_size=100) auc = roc_auc_score(y_test, yp) rmse = math.sqrt(mean_squared_error(y_test, yp)) print 'Test Err: ' + str(auc) + '\t' + str(rmse) #get errer informationclass LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs={}): self.losses = [] def on_epoch_end(self, batch, logs={}): self.losses.append(logs.get('loss')) get_evaluation() # traininghistory = LossHistory()model.fit(X_train, y_train, nb_epoch=n_epoch, batch_size=batch_size_train,callbacks=[history])# proba = model.predict_proba(X_test, batch_size=32)# score = model.evaluate(X_test, y_test, batch_size=16)# print history.losses# for layer in model.layers:# weights = layer.get_weights()# print weights