Hi there, currently I’m working in a project which will be basically to provide a RESTful API to be consumed by a third party application.
In the development I’m writing it using django-piston.

So, today I came across a problem which was basically to make sure a request wouldn’t trigger any handler methods if it wasn’t authenticated.
Those who already had this problem knows that most times the solution is simply to add a sort of login_required() decorator on your methods. The problem is that every time you want this behaviour in a certain class method you have to decorate it.

To overcome this issue, today I wrote a class decorator which made my code look cleaner and tonight I decided to write about on how to write a class decorator hoping that his might be useful for someone s it was for me.

So, let’s start with the classic method decorator where you have to decorate every single method of your class. The code will look something like this:

# file:  method_decorator.py

def method_decorator(fn):
    "Example of a method decorator"
    def decorator(*args, **kwargs):
        print "\tInside the decorator"
        return fn(*args, **kwargs)

    return decorator

class MyFirstClass(object):
    """
    This class has all its methods decorated
    """
    @method_decorator
    def first_method(self, *args, **kwargs):
        print "\t\tthis is a the MyFirstClass.first_method"

    @method_decorator
    def second_method(self, *args, **kwargs):
        print "\t\tthis is the MyFirstClass.second_method"

if __name__ == "__main__":
    print "::: With decorated methods :::"
    x = MyFirstClass()
    x.first_method()
    x.second_method()

If you run this code you will get something like:

[apalma@insys tests] python method_decorator.py 
::: With decorated methods :::
	Inside the decorator
		this is a the MyFirstClass.first_method
	Inside the decorator
		this is the MyFirstClass.second_method

Obviously, following this approach you will have to go through all the available methods and decorate the ones you want to change its default behaviour.

The other solution its to write a class decorator, where you select which methods to decorate passing their names as the decorator arguments.

The following code will make this happen:

# file:  class_decorator.py
def method_decorator(fn):
    "Example of a method decorator"
    def decorator(*args, **kwargs):
        print "\tInside the decorator"
        return fn(*args, **kwargs)

    return decorator


def class_decorator(*method_names):
    def class_rebuilder(cls):
        "The class decorator example"
        class NewClass(cls):
            "This is the overwritten class"
            def __getattribute__(self, attr_name):
                obj = super(NewClass, self).__getattribute__(attr_name)
                if hasattr(obj, '__call__') and attr_name in method_names:
                    return method_decorator(obj)
                return obj

        return NewClass
    return class_rebuilder


@class_decorator('first_method', 'second_method')
class MySecondClass(object):
    """
    This class is decorated
    """
    def first_method(self, *args, **kwargs):
        print "\t\tthis is a the MySecondClass.first_method"

    def second_method(self, *args, **kwargs):
        print "\t\tthis is the MySecondClass.second_method"

if __name__ == "__main__":
    print "::: With a decorated class :::"
    z = MySecondClass()
    z.first_method()
    z.second_method()

Run the code, and the result that comes out is the following:

[apalma@insys tests] python class_decorator.py 
::: With a decorated class :::
	Inside the decorator
		this is a the MySecondClass.first_method
	Inside the decorator
		this is the MySecondClass.second_method

As you can see the output and the program flow when calling the class methods was similar in both situations.

That is it! Hope this can be useful.

Regards

Those who use south to manage their migrations perhaps have wondered that would be nice to test them without having to mess with their local test database and usually every time you run a datamigration you want to make sure everything went fine and the data integrity was not compromised, that assumes special importance when for instance your migration is dealing with important records where the consequences of something to go wrong can be catastrophic.

Unfortunately south doesn’t provide any way to test its migrations and when you have to write a migration most of the times you find yourself testing it in your local machine against your own development data and if something fails you will have to replace your previous data and setting your testing environment so you can test it again. If you ever experienced this you know that this repetitive process can become quite a big pain.
Django however have a very good testing framework which becomes even better with django-nose, and not taking advantage of it when testing migrations sounds a bit like a waste of resources.

This was sort of the problem I came across this week and I did a bit of research and found a way to unittest datamigrations in a django testcase.

So imagine you have this migration:

# file: myapp/migrations/0002_some_migration.py 
import datetime                   
from south.db import db           
from south.v2 import DataMigration 
from django.db import models     
                                                                    
class Migration(DataMigration):   
                                  
    def forwards(self, orm):                       
        "Write your forwards methods here."       
        # access your orm here 
        Model1 = orm.Model1

The test would look like:

# file: myapp/tests.py 
from django.test import TestCase 
from south.migration import Migrations 
from myapp.models import Model1 

class TestOrm(self): 
    """ 
    This class will be used to mock the orm object which goes as a 
    param to forwards() method 
    """ 
    def __init__(self): 
        setattr(self, 'Model1', Model1) 


class MyMigrationTest(TestCase): 

    def _pick_migration(self, app_label, migration_name):               
        """                                                             
        This method will pick a migration object                         
        """                                                             
        migrations = Migrations(app_label)                     
        for migration in migrations:                                     
            if migration.full_name().split('.')[-1] == migration_name:   
                return migration                                                            
        return None 

     def test_migration_0002_some_migration(self): 
         self.migration = self._pick_migration('myapp', '0002_some_migration')
         if not self.migration:                         
             return                                                       
          
        # running migrations             
        orm  = TestOrm()                                 
        self.migration.migration_instance().forwards(orm) 
        # assertations here 
        # self.assertEquals(......) 

And that’s it.
This of course only covers datamigrations, not schema migrations but I hope this can still be useful.

I don’t know if it is appropriate to create a way to test south migrations in django but if it is I think would be an interesting feature.

Regards

It came to my mind to write about this subject after a friend of mine have asked me how could he sort an object list, coincidently last week I have had a similar problem so I decided to write a blog post to help those who might be still looking for an answer. This is not about algorithms but about Python data model and how to use it for sorting.

Imagine that there is the following:

class Student(object):
    def __init__(self, name=None, mark=None):
        self.name = name
        self.mark = mark

student_list = [
    Student(name="Andre", mark=10),
    Student(name="da", mark=5),
    Student(name="Palma", mark=7),
]

and you are doing something like:

for x in xrange(0, len(student_list)):
    for i in xrange(x+1, len(student_list)):
        if student_list[i].mark > student_list[x].mark:
            student_list[i], student_list[x] = student_list[x], student_list[i]

that means you are doing it wrong as it is not pythonic and you are not making use of any of those cool features Python gives you.
Ideally you should use sorted python buildt-in function to do it. You can do it using one of the following methods.

1. Specifying a lambda.

student_list = sorted(student_list, key=lambda student: student.mark)

Also, as you may know lambda returns a callable object/function definition so you can do something like this:

def useattr(attr):
    def kicker(obj):
        return getattr(obj, attr)
    return kicker

student_list = sorted(student_list, key=useattr('mark'))

In fact this is not that different of using attrgetter.

2. Specifying a compare method in the class. That would make the Student class to look something like:

class Student(object):
    def __cmp__(self, other):
        if self.mark > other.mark:
            return 1
        elif self.mark < other.mark:
            return -1
        else:
            return 0

    def __init__(self, name=None, mark=None):
        self.name = name
        self.mark = mark

student_list = [
    Student(name="Andre", mark=10),
    Student(name="da", mark=5),
    Student(name="Palma", mark=7),
]

student_list = sorted(student_list)

And that’s all. Have a look on this python doc page for more.

Regards

During this last months I’ve been involved in project written in Python and using the Django web Framework. Developers knows that during the development of a project tests are a very important part of your code and it has to be taken seriously to avoid bad surprises in the future.

Luckily Django has a very good testing environment which makes very easy to the developer to write their tests.

As you may know, every time you run a test in Django it will create a testing database and as the project grows up you will have more and more models and obviously this leads to a slower creation process of  this testing database and of course after a certain point this can get really painful and frustrating. Those developers who have worked with MySQL knows exactly what am I talking about.

During this last week I have been writing a couple of tests which in our case was impossible to use a in memory SQLite database as we had a MySQL specific piece of code which needed to be tested and that’s where my nightmare began.

After some hours of frustration I did some research and I found mysql-ram. This script is just amazing as it points MySQL data directory to a tmpfs partition, in other words, mounts this data directory in memory increasing drastically the speed. The database will no longer write to the normal ext3/4 partition instead it will write your data to memory.

The results are absolutely impressive. I did tests that came down from 77 seconds to less than 3. Try it yourself

Open the file, go to lines 22, 23 and 24 and configure the your server. You might want to assign another port number for your test database server.

22 BIND_SOCKET=/var/run/mysqld/mysqld-ram.sock
23 BIND_HOST=127.0.0.1
24 BIND_PORT=3307

Now, to run all you need to do its to give execute permission and run it as a normal bash script.

[apalma@insys aproject] chmod +x mysql-ram.sh
[apalma@insys aproject] sudo ./mysql-ram.sh

Now on you project root create a new settings_test.py file:

from settings import *

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'my_db',
        'USER': 'my_user', # Not used with sqlite3.
        'PASSWORD': '', # Not used with sqlite3.
        'HOST': '127.0.0.1', # Set to empty string for localhost. Not used with sqlite3.
        'PORT': '3307', # THIS IS THE PORT WHICH MySQL-RAM IS USING
    },
}

Ok!! All set, you can now run the Django test command.

[apalma@insys aproject] ./manage.py test --settings=settings_test

Hope this can be useful for you as it was for me.

Hi folks! I know I have been a little bit out of writing something here but today I decided to write something about coding and more particularly Project Euler and the problem 14( http://projecteuler.net/index.php?section=problems&id=14 ).

I’ve found this problem very interesting and since I didn’t had much to do I decided to solve it and make this post.

The biggest challenge of this problem its the magnitude of numbers we are trying to calculate(100000). Those who are familiar with the problem and tried a “brute force” approach to this problem knows that it might take “a while”(A LOT) to finish.

The algorithm of this approach will look something like this:

maximum = 0
max_value = 0
for value in xrange(1, 1000000):
    status = value
    terms = 0
    while status > 1:
        if status % 2 == 0:
            #even
            status = status / 2
        else:
            # odd
            status = status * 3 + 1
        terms += 1
    
    terms += 1
    
    if terms > maximum:
        max_value = value
        maximum = terms
        
print "value: %s   terms: %s" % (max_value, maximum)

Well, don’t even try this because this is will take a lot of time to come out a result.

What we need here to solve this its a simple cache to return the number of terms from values we already know.

So, the iterative way to solve this problem comes like this:

maximum = 0
max_value = 0
SEQUENCE_CACHE = dict()
for value in xrange(1, 1000000):
    status = value
    terms = 0
    while status > 1:
        try:
            terms += SEQUENCE_CACHE[status] - 1 
            break
        except KeyError:
            pass

        if status % 2 == 0:
            #even
            status = status / 2
        else:
            # odd
            status = status * 3 + 1
        terms += 1

    terms += 1
    SEQUENCE_CACHE[value] = terms

    if terms > maximum:
        max_value = value
        maximum = terms 
        
print "value: %s   terms: %s" % (max_value, maximum)

This one will take a few seconds and solve the problem. As you can see SEQUENCE_CACHE will keep track of previous results. Using this kind of cache system will allow to decrease dramatically the number of iterations required to achieve the number of terms of a certain value.   Try it yourself.

 

As I found this problem interesting I decided to do also the recursive solution to it.

So, as before if we try a brute force recursive approach to this problem we will face the same situation, the algorithm will take a lot to finish.

This first and not correct approach will be something like:

def euler14(value, status=None, n=1):
    """
        'value' represents the base value
        'status' represents the actual value according with odd and even forulas
        'n' recursion deepness
    """
    if status is None:
        status = value
 
    if status <= 1:
        return n
    if status % 2 == 0:
        #even
        status = status / 2
    else:
        #odd
        status = status * 3 + 1

    return euler14(value, status, n+1)
terms = 0
value = 0
for x in xrange(0, 1000000):
    result = euler14(x)
    if result > terms:
        value = x
        terms = result

print "value: %s   terms: %s" % (value, terms)

This will take a lot of time to finish because it doesn’t take into account previous known results.

So, once again a cache enters in the play to solve the issue. If you are familiar with cache and python you know that a common use of python decorators are actually for cache and it comes in the form of @decorator_name. Our case is no different so the solution comes like:

SEQUENCE_CACHE = dict()                                                  
def cache_euler14(fn):                                                   
                                                                         
    def kicker(value, status=None, n=None):                              
        try:                                                             
            SEQUENCE_CACHE[value] = n - 1 + SEQUENCE_CACHE[status]       
            return SEQUENCE_CACHE[value]                                 
        except (KeyError, TypeError):                                    
            pass                                                         
                                                                         
        if n is None:                                                    
            result = fn(value, status)                                   
        else:                                                            
            result = fn(value, status, n)                                
        if status <= 1:                                                  
            SEQUENCE_CACHE[value] = result                               
        return result                                                    
                                                                         
    return kicker                                                        
                                                                         
@cache_euler14                                                            
def euler14(value, status=None, n=1):
    """
        'value' represents the base value                                 
        'status' represents the actual value according with odd and even formulas
        'n' recursion deepness                                            
    """
    if status is None:                                                    
        status = value                                                    
    
    if status <= 1:                                                       
        return n
    if status % 2 == 0:                                                   
        #even
        status = status / 2                                               
    else:
        #odd
        status = status * 3 + 1                                           
    
    return euler14(value, status, n+1)                                    
                                                                          

terms = 0                                                                 
value = 0
for x in xrange(0, 1000000):                                              
    result = euler14(x)                                                   
    if result > terms:                                                    
        value = x
        terms = result                                                    

print "value: %s   terms: %s" % (value, terms)

Both solutions presented here will find the answer, no optimization was taken into account. Now, from here its just keep tracking of execution of this scripts and try to optimize it.

[apalma@insys projecteuler] time python euler14_it.py 
value: 837799   terms: 525

real	0m15.777s
user	0m15.601s
sys	0m0.128s

15 seconds…. Well Its not that hard to do beter than this but I’m tired and I’ll leave it for now. One clue: for the cache, try to use a list() instead of a dictionary.

 

Regards folks

Writing about this topic came to my mind few weeks ago when i had a couple of problems deploying a Django application to a production server.

Many of us knows that when you are developing your applications using django’s development server at the time to make it real and pass it to production many times things doesn’t go exactly as expected and you end up dealing with problems that you didn’t count with. For this reason this last week i started to make my development environments the most similar possible with production by using nginx which is the web server that i use most.

So my objective was to have a remote machine which would contain the webserver and also a repository system in which i can push/pull/update(and so on) the changes from my desktop machine and from there it take immediately effect on the webserver.

To achieve this goal had to install in my server 4 things:

  • nginx
  • django
  • mercurial
  • postgreSQL

So now its time to pass to the configuration.

First create a repository:

$ cd ~/repo
$ hg init project

The folder you create your repository its arbitrary, but i would recommend you to do it on a standard folder such as /var/www.

Now we will create our django project inside of it

$ cd project
$ django-admin.py startproject project

At this point we have already our repository with a template of a django project.

You can can add the created file and commit your changes, but if you try to push it it will fail and you will get an error like this:

$ hg commit -m “First commit”
$ hg push
abort: repository default-push not found!

This is because we didn’t configure properly our repository yet. So now its time to configure it. Go back to the repository root and in it you will have a hidden folder .hg, this folder contains a file called hgrc( attention ::: I mean hgrc, not .hgrc).

Edit it with your favorite text editor, in my case vim.

$ vim .hg/hgrc
1 [paths]
2 default=~/repo/project

Now will be possible to push your project with the first commit.

$ hg push

We complete the first step, but it didn’t finished yet. Now its time to configure our django fastCGI and Nginx. If you don’t know how to do it try to google it or go to this example.

Go to your django project inside your project and start fastCGI startup. It will be important to pass a pidfile to it.

$ cd ~/repo/project/project
$ python manage.py runfcgi socket=/tmp/project.sock pidfile=/tmp/project.pid

If you followed the last example you shall be able to start Nginx with Django fastCGI and at this point your web server is already serving your django project which is inside your repository.

You shall be already able to clone this project to another machine, but regardless all the changes you have made once you try to push it from there two things will happen.

  1. The project on the web server will not be updated
  2. Nginx will not get restarted and the previous project will keep being served.

To workaround this two issues we will use mercurial hooks that are functions that are triggered and executed after a certain events happens in mercurial.

So go back to your hgrc file and there you can configure it. Once Again use your favorite text editor to do this

$ cd ~/repo/project
$ vim .hg/hgrc
1 [paths]
2 default=~/repo/project
3
4 [hooks]
5 changegroup.update = hg update
6 incoming = ~/repo/project/hook.sh

Read the hooks link to know more about what changegroup and incoming does, but if you look it shall be obvious what it will do. Line 5 will solve the issue number 1 and line 6 has a path to a shell script that will solve the problem number two.

All we have to do now is to write that shell script.

$ cd ~/repo/project
$ vim hook.sh
1 PIDFILE=”/tmp/project.pid”
2 PROJECT_PATH=”~/repo/project/project”
3 SOCKET_FILE=”/tmp/project.sock”
4 pid=`cat $PIDFILE`
5 kill $pid
6 python $PROJECT_PATH/manage.py runfcgi socket=$SOCKET_FILE pidfile=$PIDFILE
7 /etc/init.d/nginx restart

Perhaps you will need to give execution permission to this script.

$ chmod +x ~/repo/project/hook.sh

Now if everything went ok its everything done. Now, you can clone the project from other machines and once you push it this shell script will kill the last fastCGI process, start a new one and restart nginx so this way your changes takes immediate effect in your web server.

This is only a basic tutorial for a real simple situation but from here you shall have the bases and adapt it to your situation.

Hi all! I know i’ve been quite far from this “Blog life” but in these last days I’ve been thinking to write something and today i did something that for me it’s one proper thing to post here.

It is my first HTTP streaming.

Streaming is one word that for those who spend some time on the internet heard or read a lot of times and it always caught my attention and curiosity so i decided to go a little further with the research and not only to know what it really is but in fact do something or in other words, put the hands to the work.

So, the method i used was the old but still used AJAX Pooling Streaming Technique wich is quite simple. After the server receives the request from a browser , the server checks if there is something to response, otherwise it olds  the connection until it has something and then response the information back to the browser, after his process the browser starts a new connection with the server again and so on.

After figuring out what was the concept about i decided to do a little experience. So the point was to have a small interface where there should be a textarea to write something and it would be streamed to a content box in the same layout.

To do this i used:

- jQuery

- Python/Django

- Django PISTON REST framework

So what i did was a really simple and stupid application where the textarea content would be stored in a file and  AJAX pooling would be reading the content of the same file.

For those who wanna implement this i must advice this is impossible to do using djangos development server  due to the impossibilitie to handle two requests at the same time, Djangos development server is a single user, single process and single thread server remember so configure Apache or any other multi thread HTTP server in order to get the things working.

The code is available here: http://bitbucket.org/andrefsp/serverpush/src

* app/handlers.py you will find the Resource who makes everything possible. This file implements a Django-Piston(http://bitbucket.org/jespern/django-piston/wiki/Home) BaseHandler class who responds to GET requests with read method and to POST requests with create method.

* static/js/serverpush.js contains javascript/jQuery code for front-end.

How does this work??

After a user writes something in the textarea, this content will be transmitted by HTTP POST request, this one will be handled by Streaming.create() and will store the information received into a file.

After the user clicks on streaming button it will trigger an HTTP GET request that will be handled by Streaming.read() method, here it will call streaming.stream() and this one olds the connection until it has something to response back.

As i said before, this is a really simple way to make streaming and if you want to do something really cool perhaps its better to take advantage of the new WebSockets introduced in HTML5.

I believe in a few years a lot of web applications will be running using this new feature of HTML5 wich avoids a lot of complications and “tricks” to make the things done.

See ya!

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: