numpy: NamedColumnArray subclass of ndarray

This is the place to post any code that you want to share with the community. Only completed scripts should be posted here.

numpy: NamedColumnArray subclass of ndarray

Postby iliar » Mon Apr 28, 2014 8:27 am

numpy does have the recarray class, but I find it highly uncomfortable due to the fact that you can't just make myrecarray.sum(axis=0) for example, and it seems awkward that each element is a tuple.
so! Here's a class I made that has a name for each column. It is a Mappable, and each column can be accessed individually like a dictionary and it is also an ndarray, with a homogeneous data type (not a recarray).
you can slice columns, you can assign to sliced subarrays. and you can even perform complicated slices involving both row indices and column names.
It is not only a named column array, if your array has `n' dimensions, the n'th dimension will be named.

anyway, other than being an extremely good lesson for me on the power of python and numpy, it is also very useful for my work.

take a look:

(note: added a small bug fix, for a bug that comes about only when fieldnames are more than one letter :) , line 58)
(note2: another small bug fix, so columns may be accessed by an Iterable like a numpy ndarray, and the return value will contain corresponding fieldnames, line 68)

Code: Select all
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 27 11:44:20 2014

@author: Ilia
"""

import numpy as np
import sys
import collections

class NamedColumnArray(np.ndarray,collections.Mapping):
    def __new__ (cls,input_array,fieldnames):
#        print 'in new',cls,type(input_array)
        global obj
        obj = np.asarray(input_array).view(cls)
        obj.fieldnames = list(fieldnames)
        obj.m = len(obj.fieldnames)
        obj.fieldname_dict = {fn:i for i,fn in enumerate(obj.fieldnames)}
        if obj.m != obj.shape[-1]:
            raise ValueError('number of field names must match last dimension')
        return obj
    def __array_finalize__(self,obj):
#        print 'in array_finalize',type(self),type(obj)
        if isinstance(obj,type(self)):
            self.m=obj.m
            self.fieldnames=list(obj.fieldnames)
            self.fieldname_dict = dict(obj.fieldname_dict)
        if obj is None: pass
   
    def _interpret_item_indices(self,item):
        if not isinstance(item,tuple):
            if isinstance(item,str):
                try:
                    newitem = (Ellipsis,self.fieldname_dict[item])
                    newfieldnames = None
                except KeyError:
                    raise IndexError('bad index: ' + item)
            elif isinstance(item,collections.Sequence) and \
                    len(item)>0 and \
                    all([isinstance(x,str) for x in item]):
                newitem = [self.fieldname_dict[fn] for fn in item]
                newfieldnames = item
            else:
                newitem = item
                newfieldnames = None
        elif len(item) == len(self.shape) or \
                    any([x is Ellipsis for x in item]):
                field_ind = item[-1]
                if isinstance(field_ind,int):
                    newfieldnames = None
                    newitem = item
                elif isinstance(field_ind,slice):
                    newfieldnames = self.fieldnames[field_ind]
                    newitem = item
                elif isinstance(field_ind,str):
                    newitem = item[:-1] + \
                                (self.fieldname_dict[field_ind],)
                    newfieldnames = None
                elif isinstance(field_ind,collections.Sequence) and \
                        len(field_ind)>0 and \
                        all([isinstance(x,str) for x in field_ind]):
                    newitem = item[:-1] + \
                                ([self.fieldname_dict[fn] for fn in field_ind],)
                    newfieldnames = item[-1]
                elif isinstance(field_ind,collections.Iterable):
                    field_ind_int = np.arange(self.m)[field_ind]
                    newfieldnames = [self.fieldnames[i] for i in field_ind_int]
                    newitem = item[:-1] + (field_ind_int,)
                else:
                    newitem = item
                    newfieldnames = None
        elif isinstance(item[-1],str):
            newfieldnames = None
            try:
                newitem = item[:-1] + (self.fieldname_dict[item[-1]],)
            except KeyError:
                raise IndexError('bad index: ' + item[-1])
        else:
            newitem = item
            newfieldnames = None
        return newitem,newfieldnames
    def __contains__(self,item):
        if isinstance(item,str):
            return item in self.fieldnames
        else:
            return np.ndarray.__contains__(self,item)
    def __eq__(self,other):
        return isinstance(other,NamedColumnArray) and \
            self.fieldnames == other.fieldnames and \
            np.ndarray.__eq__(self,np.asarray(other))
    def __ne__(self,other):
        return not self.__eq__(other)
    def __getitem__(self,item):
        newitem,newfieldnames = self._interpret_item_indices(item)
#        print 'getitem',newitem,newfieldnames
        retval = np.ndarray.__getitem__(self,newitem).view(np.ndarray)
        if not newfieldnames is None:
            retval = NamedColumnArray(retval,newfieldnames)
        return retval
    def __setitem__(self,item,value):
        if isinstance(value,collections.Mapping):
            if isinstance(item,tuple):
                indices = item
            else:
                indices = (item,)
            for k in value.keys():
#                print k
                self[indices+(k,)] = value[k]
        else:
            newitem,newfieldnames = self._interpret_item_indices(item)
#            print 'setitem',newitem
            np.ndarray.__setitem__(self,newitem,value)
    def __getslice__(self,_from,to):
        if _from == 0 and to == sys.maxsize:
            s = slice(None,None,None)
        else:
            s = slice(_from,to,1)
        return self.__getitem__(s)
    def __setslice__(self,_from,to,value):
        if _from == 0 and to == sys.maxsize:
            s = slice(None,None,None)
        else:
            s = slice(_from,to,1)
        self.__setitem__(s,value)
    def __array_wrap__(self,out_arr,context=None):
        pass #this'll make sure that max and sum don't return NamedColumnArray
    def get(self,key):
        return self[key]
    def items(self):
        return [i for i in self.iteritems()]
    def keys(self):
        return list(self.fieldnames)
    def values(self):
        return [self[fn] for fn in self.fieldnames]
    def iterkeys(self):
        return (fn for fn in self.fieldnames)
    def itervalues(self):
        return (self[fn] for fn in self.fieldnames)
    def iteritems(self):
        return ((key,value) for key,value in zip(self.iterkeys(),self.itervalues()))
    def __str__(self):
        return ','.join(self.fieldnames) + '\n' + \
            np.ndarray.__str__(self)
    def __repr__(self):
        return "{typename}(['{fieldnames}'],{array})".format(\
            typename = type(self).__name__,
            fieldnames="','".join(self.fieldnames),
            array = self.view(np.ndarray).__repr__())
#        return 'asdf'
if __name__=="__main__":
    class D(NamedColumnArray):
        def __new__(cls,info,*args,**kwargs):
            print args
            obj = NamedColumnArray.__new__(D,*args,**kwargs)
            obj.info = info
            return obj
    d = D('info',np.random.rand(5,2),['xa','y'])
    print 'xa',d['xa']
    d[1,'xa']=5
    print d
    d[2] = {'xa':7,'y':8}
    print d
    d[3,['y','xa']] = [1,2]
    print d[:,['xa','xa','y','xa']]
    print repr(d)
    print ''
    nca = NamedColumnArray(np.random.rand(5,4),['a','b','c','d'])
#    print nca
    print ''
    print 'get_item'
    x=nca['a']
    print type(x)
    print ''
    print 'get_item return NamedColumnArray'
    z = nca[...,1:3]
    print ''
    print 'sum'
    s=nca.sum(axis=0)
    print type(s)
    print ''
    print 'max'
    m=nca.max(axis=0)
    print type(m)
    print ''
    print 'mul'
    y = np.random.rand(*nca.shape) * nca
    print type(y)
    print ''
    print type(nca[1:5]),nca[1:5]
    print ''
    print 'insert at random indices'
    height = 3
    random_numbers = NamedColumnArray(\
        np.random.randint(low=10,high=20,size=(height+1,2)),
        ['b','c'])
#    print random_indices
#    indices_nca = NamedColumnArray(random_indices,['b','c'])
#    inserted_array = np.random.randint(10,20,indices_nca.shape)
    print random_numbers
    print ''
    nca[slice(1,(1+height+1),1)] = random_numbers
    print nca

Last edited by Mekire on Mon Apr 28, 2014 9:01 am, edited 1 time in total.
Reason: Lock. Reply to this post if you need to edit and we can change it. Don't double post.
iliar
 
Posts: 1
Joined: Mon Apr 28, 2014 8:07 am

Return to Completed Scripts

Who is online

Users browsing this forum: W3C [Linkcheck] and 1 guest