Tag Archives: Python

API Testing with Flask: Post

Have you ever tried to test POST requests to an API written with Flask? Today I was going through an old code base writing unittests that should have been written eons ago when I ran into a snag. The issue stemmed when I tried manually setting the headers Content-Type.

But before we get to that, let me show you the skeleton of the Flask route I was testing:

@app.route('/someendpoint/' methods=['POST'])
def some_endpoint():
    """API endpoint for submitting data to

    :return: status code 405 - invalid JSON or invalid request type
    :return: status code 400 - unsupported Content-Type or invalid publisher
    :return: status code 201 - successful submission
    """
    # Ensure post's Content-Type is supported
    if request.headers['content-type'] == 'application/json':
        # Ensure data is a valid JSON
        try:
            user_submission = json.loads(request.data)
        except ValueError:
            return Response(status=405)
        ... some magic stuff happens
        if everything_went_well:
            return Response(status=201)
        else:
            return Response(status=405)

    # User submitted an unsupported Content-Type
    else:
        return Response(status=400)

Ignoring the magic, everything seems in order. So, lets go ahead and write a quick unittest that posts an invalid json.

    def test_invalid_JSON(self):
        """Test status code 405 from improper JSON on post to raw"""
        response = self.app.post('/someendpoint',
                                data="This isn't a json... it's a string!")
        self.assertEqual(response.status_code, 405)

Cool, let’s run it!

    Failure
    self.assertEqual(response.status_code, 405)
    AssertionError: 400 != 405

A quick glance at the code and I realize I’ve forgotten to set the headers Content-Type! Wait. How do I set the Content-Type? That’s a good question. After searching around for people who had run into similar problems this is what I came up with:

    def test_invalid_JSON(self):
        """Test status code 405 from improper JSON on post to raw"""
        response = self.app.post('/someendpoint',
                                data="This isn't a json... it's a string!",
                                headers={'content-type':'application/json')
        self.assertEqual(response.status_code, 405)

Let’s try this again.

    Failure
    self.assertEqual(response.status_code, 405)
    AssertionError: 400 != 405

Hmmm, okay. Next, I decided to inspect the request coming into the flask app and found something odd in request.headers:

    Host: localhost
    Content-Length: 10
    Content-Type: 

Why is the Content-Type empty? Another search gave hinted at another possible solution. Why not just build the headers dict inline?

    headers=dict(content-type='application/json') # But that's not right. We can't have '-' in a key.

By this point I’ve become agitated. Neither the Flask docs themselves nor various forums have been of much use. Then I stumbled across this.

Perhaps I missed something in the docs. Either way, I learned that you can hand the Content-Type as a parameter to the post method. It works and it looks much cleaner. Let’s revise my initial unittest accordinlgy:

    def test_invalid_JSON(self):
        """Test status code 405 from improper JSON on post to raw"""
        response = self.app.post('/raw',
                                data="not a json",
                                content_type='application/json')
        self.assertEqual(response.status_code, 405)

And, let’s run this one last time and look at the request as it comes though.

    Tests passed
    Host: localhost
    Content-Length: 10
    Content-Type: application/json

Much better. Now, back to testing!

-H.

Tagged , , , , ,

First Steps with Celery: How to Not Trip

Recently, I was tasked with integrating a task queue into a web framework at work. For the purpose of this post, I would like note that I am operating with Python 2.7.5, Flask 0.9, Celery 3.0.21, and RabbitMQ 3.1.3. This post was written using IPython 0.13.2 in an IPython notebook.

Now, I’ve never implemented a task queue before and boy did that ever make this difficult. A quick search result showed that Celery was the main player in the Python task queue arena.

Before diving into the code base at work I set up a virtualenv and followed Celery’s First Steps with Celery tutorial. It was easy, as was the Next Steps tutorial. I would go so far as to say they were too simple. When I went to apply my freshly earned skills to my code base I ran into a series of walls. Unfortunately, I didn’t have any luck pinging either Celery’s irc channel #celery or their Google group.

But, eventually I figured it out. I’m writing this so that you will (hopefully) avoid similar frustrations. Enjoy!

Picking a Broker

Celery requires a message broker. This broker acts a middleman sending and receiving messages to Celery workers who in turn process tasks as they receive them.

Celery recommends using RabbitMQ. I opted for this as my knowledge in this area is limited and assumed they would likely have the most thorough and robust documentation for it.

Installing RabbitMQ in Ubuntu is easy:

    $ sudo apt-get install rabbitmq-server

Installing it on a mac was also rather simple:

    $ brew update
    $ brew install rabbitmq

    # update your path in ~/.bash_profile or .profile with
    PATH=$PATH:usr/local/sbin

Note: A co-worker ran into issues installing RabbitMQ via homebrew. To resolve this he followed the standalone mac installation instructions here.

Once installed, starting the server is as simple as:

    $ rabbitmq-server
    # or you can start in the background with
    $ rabbitmq-server -detached

And you can stop it with:

    $ rabbitmqctl stop

Installing Celery

Installing Celery was very simple. From within your virtualenv (you should be using virtual environments!):

    $ pip install celery

Setting up Celery config, Celery daemon, and adding ‘tasks’

The steps below are bit more convoluted than the aforementioned tutorial provided by the Celery team. This is meant to be more of a comprehensive ‘real world’ example. If you would like something simpler please go here

Project Structure:

    project/
    project/celeryconfig.py
    project/framework/celery/celery.py
    project/framework/email/email_tasks.py

Celery config — celeryconfig.py

    # config file for Celery Daemon

    # default RabbitMQ broker
    BROKER_URL = 'amqp://'

    # default RabbitMQ backend
    CELERY_RESULT_BACKEND = 'amqp://'

There are a couple of things to note here. First, we are using RabbitMQ as the broker and the backend. Wait, what is the backend? The backend is the resource which returns the results of a completed task from Celery. Second, you may be wondering what amqp is. amqp is a custom protocol that RabbitMQ utilizes. More information on it can be located here.

More information on celery configuration and defaults can be found in the Celery docs.

Celery daemon: Preparing our daemon — celery.py

    from __future__ import absolute_import

    from celery import Celery

    # instantiate Celery object
    celery = Celery(include=[
                             'framework.email.email_tasks'
                            ])

    # import celery config file
    celery.config_from_object('celeryconfig')

    if __name__ == '__main__':
        celery.start()

The two commented portions here can be a bit confusing.

    celery = Celery(include=[
                             'framework.email.email_tasks'
                            ])

Here we are instantiating a Celery object and handing it a list containing the relative (to where you start your Celery daemon!) path to all modules containing Celery tasks.

    celery.config_from_object('celeryconfig')

Next, we are telling that newly instantiated Celery object to import its configuration settings from celeryconfig.

Headache Number One: Celery and relative imports

I’m sad to admit that it look me 15 minutes figure out why I didn’t need celeryconfig.py in the same directory as my celery.py. So, read this and learn from my stupid mistake.

Again, I want to emphasize everything is relative to where the Celery daemon is launched.

  • Our Celery daemon will be launched from /
  • Because the config file is located at /celeryconfig.py
  • The daemon looks for the config file in the root: celeryconfig
  • Additionally the module containing tasks is located several directories deep: /framework/email/email_tasks.py
  • So the daemon thinks the email_tasks.py is located several directories deep framework.email.email_tasks

Creating a task: Let’s queue up some emails! — email_tasks.py

    from email.mime.text import MIMEText

    def send_email(to=None, subject=None, message=None):
        """sends email from hairycode-noreply to specified destination

        :param to: string destination address
        :param subject: subject of email
        :param message: body of message

        :return: True if successful
        """
        # prep message
        fro="hairycode-noreply@hairycode.org"
        msg = MIMEText(message)
        msg['Subject'] = subject
        msg['From'] = fro 
        msg['To'] = to

        # send message
        s = smtplib.SMTP('mail.hairycode.org')
        s.ehlo()
        s.starttls()
        s.ehlo()
        s.login('YOUR_USERNAME', 'YOUR_PASSWORD')
        s.sendmail('hairycode-noreply@hairycode.org, [to], msg.as_string())
        s.quit()
        return True

Making this function into a task is as simple as importing our Celery object and adding a decorator (almost).

Recall that when we instantiated our Celery daemon we handed it a list of relative paths. One of those was to this file ‘framework.email.email_tasks’. When Celery is started it will comb over any files in that list and look for

    @celery.task

So, let’s go ahead and modify our function to meet the spec.

    from email.mime.text import MIMEText

    # import our Celery object
    from framework.celery.celery import celery

    # add the decorator so it knows send_email is a task
    @celery.task
    def send_email(to=None, subject=None, message=None):

    # code removed for brevity

If everything else is in order your app will be able to add these to the Queue by either calling the .delay() or .apply_async() functions. But, before we can do that let’s make sure our RabbitMQ server and Celery daemon are up and running.

Testing Our New Task

Launch RabbitMQ

Launch your RabbitMQ server in the background from the shell

    $ rabbitmq-server -detached

You can ensure it’s running the background by inspecting your processes

    $ ps aux | grep rabbit --color

Which should yield three things

  1. A very, very long output (this is the rabbitmq-server we just launched)
  2. The RabbitMQ daemon always running silently“hairycode 27491 0.0 0.0 599680 156 ?? S 5:24PM 0:00.33 /usr/local/Cellar/rabbitmq/3.1.3/erts-5.10.1/bin/../../erts-5.10.1/bin/epmd -daemon”
  3. And, the grep command you just executed“hrybacki 35327 1.2 0.0 2432768 596 s000 S+ 2:25PM 0:00.00 grep rabbit –color”

Note: If you see one or more additional of the “long” processes running you will run into issues. If this is the case stop all RabbitMQ servers

    $ rabbitmqctl-stop

and start over. I will provide an example of what can go wrong if there are multiple brokers or Celery daemons running at once.

Launch the Celery daemon

From the project/ directory launch the Celery daemon

    $ celery -A framework.celery.celery worker -l debug

which should give you a daemon monitor without put along the lines of

     -------------- celery@Harrys-MacBook-Air.local v3.0.21 (Chiastic Slide)
    ---- **** ----- 
    --- * ***  * -- Darwin-12.4.1-x86_64-i386-64bit
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> broker:      amqp://guest@localhost:5672//
    - ** ---------- .> app:         __main__:0x10f5355d0
    - ** ---------- .> concurrency: 4 (processes)
    - *** --- * --- .> events:      OFF (enable -E to monitor this worker)
    -- ******* ---- 
    --- ***** ----- [queues]
     -------------- .> celery:      exchange:celery(direct) binding:celery

    [Tasks]
      . framework.email.email_tasks.send_email

    [2013-07-23 15:46:55,342: DEBUG/MainProcess] consumer: Ready to accept tasks!

-A framework.celery.celery worker

informs Celery which the app instance to use and that it will be creating workers. Workers take tasks from the queue, process them, and return the result to the message broker.

-l debug

tells Celery that you want it to display log level debug output for testing purposes. Normally you would execute -l info for a log level info output.

Now, let’s make sure we have some Celery workers up and running

    $ ps aux | grep celery --color

Note the concurrency number when we launched the Celery daemon. This is the number of processors and in turn workers which should have been launched. The grep output from the previous command should leave you with that many outputs similar to

    hairycode       37992   0.1  0.4  2495644  33448 s001  S+    3:20PM   0:00.74 /Users/hairycode/git/staging-celery/venv/bin/python /Users/hairycode/git/staging-celery/venv/bin/celery -A framework.celery.celery worker -l debug

Detailed information about launching the Celery daemon can be found here or from the shell

    $ celery --help

Testing with IPython

Note: I am using IPython from the root directory in the code segment below. You could just as easily, well maybe not easily, use the standard Python interpreter or write a test script in Python. But, IPython is awesome. I like awesome things.

Executing our Task

    # import celery
    import celery

    # import our send_email task
    from framework.email.email_tasks import send_email

    # call our email function
    result = send_email.delay('', 'all your smtp are belong to us', 'somebody set up us the bomb')

    type(result)

If you look at your Celery daemon you can see the task coming in, being processed, returning the result, and even how long it took to execute. For example the call above gave me the following output

    [2013-07-23 15:48:29,145: DEBUG/MainProcess] Task accepted: framework.email.email_tasks.send_email[09dad9cf-c9fa-4aee-933f-ff54dae39bdf] pid:39336
    [2013-07-23 15:48:30,600: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2013 VMware, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'consumer_cancel_notify': True, u'publisher_confirms': True, u'basic.nack': True}, u'platform': u'Erlang/OTP', u'version': u'3.1.3'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
    [2013-07-23 15:48:30,601: DEBUG/MainProcess] Open OK!
    [2013-07-23 15:48:30,602: DEBUG/MainProcess] using channel_id: 1
    [2013-07-23 15:48:30,604: DEBUG/MainProcess] Channel open
    [2013-07-23 15:48:30,607: INFO/MainProcess] Task framework.email.email_tasks.send_email[09dad9cf-c9fa-4aee-933f-ff54dae39bdf] succeeded in 1.46279215813s: True

some_task.delay() vs some_task.apply_async()

some_task.delay() is a convenient method of calling your function as it looks like a regular function. However, it is short hand for calling some_task.apply_async(); apply_async() is a more powerful and flexible method for calling your tasks. Detailed information on both can be located here.

Executing our task — more realistically

The AsyncResult is the Celery object that the backend (RabbitMQ) returned after the worker (Celery) completed the task. The long string following it is the task_id. More often you won’t assign the function call to a variable. Doing so would hold up our app until the task had completed. That wouldn’t make much sense would it? Rather, you will simply call the delay or apply_async function and let your code continue on like this

    # import celery
    import celery

    # import our send_email task
    from framework.email.email_tasks import send_email

    # call our email function
    send_email.delay('', 'all your smtp are belong to us', 'somebody set up us the bomb')

Remember, we still have the task id. If you want to check the status or result of what we just submitted you can do so by asking the task queue

    # grab the AsyncResult 
    result = celery.result.AsyncResult('09dad9cf-c9fa-4aee-933f-ff54dae39bdf')

    # print the task id
    print result.task_id
    09dad9cf-c9fa-4aee-933f-ff54dae39bdf

    # print the AsyncResult's status
    print result.status
    SUCCESS

    # print the result returned 
    print result.result
    True

This is a very basic run down. If you want to much more detailed information on this I would recommend checking out the Calling Tasks section of Celery’s documentation.

Headache Number Two: My Celery daemon is only receiving every other task? Wat.

This little bug took me entirely too long to solve. At some point I started noticing that exactly half of the .delay() calls I was making were permanently in a state of PENDING.

For example, running this

    ###IPython output

    from framework.email.email_tasks import send_email

    send_email.delay('', 'all your smtp are belong to us', 'somebody set up us the bomb')

    send_email.delay('', 'all your smtp are belong to us', 'somebody set up us the bomb')

Gave the following output from the Celery daemon

    [2013-07-22 18:18:44,576: DEBUG/MainProcess] Task accepted: tasks.test[0e55bfed-1f05-4700-90fe-af3dba34ced5] pid:7663
    [2013-07-22 18:18:44,583: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2012 VMware, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'consumer_cancel_notify': True, u'publisher_confirms': True, u'basic.nack': True}, u'platform': u'Erlang/OTP', u'version': u'2.8.4'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
    [2013-07-22 18:18:44,585: DEBUG/MainProcess] Open OK!
    [2013-07-22 18:18:44,585: DEBUG/MainProcess] using channel_id: 1
    [2013-07-22 18:18:44,586: DEBUG/MainProcess] Channel open
    [2013-07-22 18:18:44,589: INFO/MainProcess] Task framework.email.email_tasks.send_email[0e55bfed-1f05-4700-90fe-af3dba34ced5] succeeded in 2.0180089473724s: True

0e55bfed-1f05-4700-90fe-af3dba34ced5 was there but af3846a9-4a31-4a8d-99a4-0d990d51ef22 wasn’t.

I restarted my Celery daemon. Same thing.

I restarted my RabbitMQ server. Same thing.

I created an entire new project and followed the First Steps with Celery docs. Same thing.

Confused I searched around but I could only find one other person who had encountered something similar and that issue was over a year old. Note: I tried his solution but it didn’t resolve the issue.

The trick was that somewhere along the line I had another set of Celery workers running in the background that were not part of the daemon I had just started running. These workers were taking tasks from the queue and I wasn’t getting them back. I was able to recreate the same bug by having a second instance of RabbitMQ server running.

Remember when I told you to ensure you only had one RabbitMQ server and the correct number of concurrent Celery workers running by checking your processes? This is why. Don’t do this.

Let’s Improve our Setup

Adding logs

Adding logs was pretty straightforward. First, we need to modify our celeryconfig.py to specify where we want our logs:

    # celeryconfig.py

    # default RabbitMQ broker
    BROKER_URL = 'amqp://'

    # default RabbitMQ backend
    CELERY_RESULT_BACKEND = 'amqp://'

    # specify location of log files
    CELERYD_LOG_FILE="/path/to/your/logs/celery.log"

Now, we implement logging within the task itself.

After importing the required function, we grab the logger associated with our Celery app

    logger = get_task_logger(__name__)

Then, at the desired point log a custom message to log level info. Note: If you desired to log to another level e.g. debug you would use logger.debug(…)

    logger.info('Sending email from: %r, to: %r' % (fro, to))

The resulting email_tasks.py looks like:

    from email.mime.text import MIMEText
    from framework.celery.celery import celery
    # import the Celery log getter
    from celery.utils.log import get_task_logger

    # grab the logger for the Celery app
    logger = get_task_logger(__name__)

    def send_email(to=None, subject=None, message=None):
        """sends email from hairycode-noreply to specified destination

        :param to: string destination address
        :param subject: subject of email
        :param message: body of message

        :return: True if successful
        """
        # prep message
        fro="hairycode-noreply@hairycode.org"
        msg = MIMEText(message)
        msg['Subject'] = subject
        msg['From'] = fro 
        msg['To'] = to

        # log desired message to info level log
        logger.info('Sending email from: %r, to: %r' % (fro, to))

        # send message
        s = smtplib.SMTP('mail.hairycode.org')
        s.ehlo()
        s.starttls()
        s.ehlo()
        s.login('', '')
        s.sendmail('hairycode-noreply@hairycode.org, [to], msg.as_string())
        s.quit()
        return True

And that’s it! After implementing logging, tasks should be adding your messages to their respective log files e.g.:

    [2013-07-23 15:48:29,145: INFO/MainProcess] Sending email from: hairycode-noreply@hairycode.org, to: 'testymctester@test.com'

Conclusion

Learning Celery has been… frustrating. The above examples barely begin to scratch the surface of what it’s capable of. It is an incredibly powerful and configurable tool I would however, like to see a more responsive community but, I understand we all busy people. Queuing tasks is a necessity for any major application and I’m beginning to develop a love-hate relationship with Celery. More to follow?

-H.

Tagged , , , , , ,

SciPy 2013: Day Two

Tutorial Three: An Introduction to scikit-learn (I) – Gaël Varoquaux, Jake Vanderplas, Olivier Grisel

For a long time I’ve been very curious about machine learning. Up to this point it’s appeared to me much like a mystical unicorn. They seem really cool but you never really know much about them. This tutorial provided me with an excellent chance to break that mysticism down.

After a brief introduction to scikit-learn and a refresher on numpy/matplotlib we used IPython notebooks to walk through basic examples of what the suite is capable of. We then moved into a quick overview of what machine learning is and some common tactics for tackling data analysis. Now that we were a bit more familiar with the suite itself and machine learning principles, we moved onto more complex examples.

Again, using IPython notebooks we walked through examples of supervised learning (classification and regression), unsupervised learning (clustering and dimensionality reduction), and using PCA for data visualization. We ended the morning session with a couple of more advanced supervised learning examples (determining numbers of hand written digits and Boston house prices based on various factors) and an advanced unsupervised learning example in which we analyzed over 20,000 text articles to determine from which of four categories they likely originated.

One note for further research: How much data should be used for train vs test data? What factors play a role in this and are there any common standards or practices which researchers follow?

Tutorial Four: Statistical Data Analysis in Python – Christopher Fonnesbeck

Statistics is an area of for me. Combine that interest with Python Pandas and you’ve got an instant winner, right? Not exactly. While the talk was tagged for beginners it proved to be otherwise.

The speaker clearly had a very strong background in statistics. However, those that background didn’t transition into an easy to follow talk. The statistics language was very far above me and most of the room — if my observations were correct. Additionally, the version of pandas he used wasn’t the same as the version in the required packages noted in the talks description. This resulted in the majority of us not being able to follow along in IPython notebooks and being forced to watch him on the projector.

Please, don’t mistake this for a whine session. Chris knew his stuff and he was able to answer everyones’ questions and smashed some ‘stump the chump’ attempts without batting an eye. But, the talk should have been refined and rehearsed and versions of required packages should have been vetted earlier.

You can’t win them all, right?

-H.

Tagged , , , , ,

SciPy 2013: Day One

Registration:

This year I am fortunate enough to be able to attend SciPy. SciPy is a Python conference focused on scientific programming. A big shout out to the Center for Open Science for making this trip a possibility. This will be the first of a (hopefully) daily blog series in which I will briefly cover how my day went and any lasting impressions it left me with.

The conference organizers made checking in  quick and painless. We were served a breakfast buffet that was surprisingly good. Of paticular interest to me were the scrambed egg mini-bagels and lemon poppyseed bread slices — yum. Breakfast as followed by a series of tutorials which registrants chose in advance.

Tutorial One: Guide to Symbolic computing with SymPy — Ondřej Certik, Mateusz Paprocki, Aaron Meurer

The SymPy tutorial was … interesting. We simulataneously listened to the lecturer discuss and demonstrate common SymPy functions while completing examples in various IPython notebooks that they provided us with. It was fast paced, too fast for me anyway, and we had to skip over a lot of material.

The tutorial docs recommended experience with IPython and ‘basic mathematics.’ However, I was quite surprised how far my definition of basic mathematics was from theirs. Unfortunately, this left me struggling to keep up with the tutorial even from early on. After a mid-session break, we briefly covered Calculus functionality before being introduced to some real world applications. This is where I became utterly lost.

SymPy’s original lead developer showed us several examples of his use of SymPy while preparing his dissertation in Chemical Physics. These included Poisson Equations, Fast Fourier Transform(s) (FFTs), and Variation with Lagrange multipliers. Don’t know what some (any) of those are? It’s okay, neither do I. On a side note, they did show an interesting ‘hack’ using javascript injectin in an iPython notebook which allowed them to manipulate 3D figures.

While the tutorial itself felt a bit unpolished, the instructors knew there stuff. All in all SymPy seems like a really interesting tool which I plan to use. When combined with IPython notebooks I believe it could create very powerful, long lasting notes for a variety of math intensive classes. I’ll be testing this out next semester in Physics.

Tutorial Two: IPython in depth — Fernando Perez, Brian Granger

Anyone who has listened to either Fernando or Brian could have told you that his tutorial was going to be good. It was. They provided a solid tutorial environment with IPython notebooks that kept me feeling like I was actively working with them throughout the entire tutorial. Whenever anyone had a question they knew how to answer them quickly and concisely.

A few things I found of paticular interest:

  1. IPython Notebook: If haven’t heard of this, click the link and check out. I’m not kidding. This is a versatile web tool that is incredibly powerful. For some cool examples of what people have done with notebook (including writing a book!) click here.
  2. Awesome help functionality: With IPython’s built in help functionality  [ ?, ??,  %quickref, and %magic ] you can quickly get a syntax highlighted help description, the source for a module, or even access a nifty quick reference guide mostly eliminating the need  to pop out of a notebook or console and visit online docs.
  3. Kickass debugger: IPython’s shell is amazing. But, I’ve found myself using PyCharm for more advanced bit of code while debugging. After learning about IPython’s magic %debug and %run -d theprogram.py that may have changed. They provide you with very powerful and easy to use debugging abilities I wasn’t even aware existed.

Day one down. Time for sleep.

-H.

Tagged , , , , ,

Back to the N-Queens with Backtracking:

Throughout this semester I have been thoroughly enjoying my artificial intelligence class. While the probability stuff can be a bit much for me at times, I absolutely love learning various searching algorithms. For our first project I implemented a genetic algorithm to find a solution for the Traveling Salesperson Problem — in a language I’d never used, Python. While I did have my ups and downs, I learned to love the cleanliness of the language and the welcoming community it has built up. Naturally, I chose Python for my second project.

The N-Queens problem is rather straightforward; how can we place N number of queens on a NxN board such that no queen can directly attack another? This is rather simple, albeit monotonous, to do by hand on small board i.e.: 4 or 5 queens. But, what about when we have 8, 10, or even 20 queens? Things get a bit more tricky. For simplicity, I chose to solve the problem with a backtracking algorithm.

Here’s a summary of how it works:

  1. Starting from the first column, check every row for a threat from queens above, below, left, right, or diagonal to it. Once you find a safe row for a given column, place a queen there.
  2. Recurs incrementing one column until you have either placed a queen in every column or you are unable to safely place a queen in a column. If latter occurs, you must backtrack to the last known good solution.
  3. Annotate the location of the last queen’s row, remove the queen, and recurs calling on the previous column beginning one row passed where the last queen was.

Before starting to code, I drew out a simple, 4-Queen problem on a white board, step by step, and really thought about what was going on, how I wanted to attack the problem, and tricky cases which would need special care. Doing so greatly reduced the amount of headaches I could have had by going straight to the old code and fix  model. Now, only if the entry level C.S. students in my lab would do the same.

Now, down to the good stuff. This is what I came up with:

import sys
import numpy as np

# override default recursion limit
sys.setrecursionlimit(1000000000)


class NQueens:

    def __init__(self, size_of_board):
        self.size = size_of_board
        self.columns = [] * self.size
        self.num_of_places = 0
        self.num_of_backtracks = 0

    def place(self, startRow=0):
        """ Backtracking algorithm to recursively place queens on the board
            args:
                startRow: the row which it attempts to begin placing the queen
            returns:
                list representing a solution
        """
        # if every column has a queen, we have a solution
        if len(self.columns) == self.size:
            print('Solution found! The board size was: ' + str(self.size))
            print(str(self.num_of_places) + ' total places were made.')
            print(str(self.num_of_backtracks) + ' total backtracks were executed.')
            print(self.columns)
            return self.columns

        # otherwise search for a safe queen location
        else:
            for row in range(startRow, self.size):
                # if a safe location in this column exists
                if self.isSafe(len(self.columns), row) is True:
                    # place a queen at the location
                    self.columns.append(row)
                    self.num_of_places += 1
                    # recursively call place() on the next column
                    return self.place()

                # if not possible, reset to last state and try to place queen
            else:
                # grab the last row to backtrack from
                lastRow = self.columns.pop()
                self.num_of_backtracks += 1
                # recursively call place() from the last known good position, incrementing to the next row
                return self.place(startRow=lastRow + 1)

    def isSafe(self, col, row):
        """Determines if a move is safe.
        args:
            col: column of desired placement
            row: row of desired placement
            self.columns: list of queens presently on the board
        returns:
            True if safe, False otherwise
        """
        # check for threats from each queen currently on board
        for threatRow in self.columns:
            # for readability
            threatCol = self.columns.index(threatRow)
            # check for horizontal/vertical threats
            if row == threatRow or col == self.columns.index(threatRow):
                return False
            # check for diagonal threats
            elif threatRow + threatCol == row + col or threatRow - threatCol == row - col:
                return False
        # if we got here, no threats are present and it's safe to place the queen at the (col, row)
        return True

# set the size of the board
n = 17
# instantiate the board and call the backtracking algorithm
queens = NQueens(n)
queens.place(0)

# convert board to numpy array for pretty printing
board = np.array([[' '] * n] * n)
for queen in queens.columns:
    board[queens.columns.index(queen), queen] = 'Q'

print board

And here is a sample output for a 17-Queen solution:

Solution found! The board size was: 17
5374 total places were made.
5357 total backtracks were executed.
[0, 2, 4, 1, 7, 10, 14, 6, 15, 13, 16, 3, 5, 8, 11, 9, 12]
[['Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q']
 [' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'Q' ' ' ' ' ' ' ' ']]

Now, you may have noticed these lines:

# override default recursion limit
sys.setrecursionlimit(1000000000)

After developing a working solution I started receiving: “RuntimeError: maximum recursion depth exceeded in cmp” on boards with more than 13 queens. I found this surprising and did some research. After some digging, I discovered Python has a rather conservative default recursion limit to help avoid infinite recursive calls. My resolution was rather hackish but allowed me to bump the solvable board size to 17 queens by manually setting the recursion limit. This was more than good enough for my class project.

If time permits, I would like to go back and mimic the recursive nature of this with a stack and see how large of a solution would be able to solve. If time permits.

More to follow … maybe?

“To understand recursion, you must understand recursion.”

-H.

Tagged , , , , , ,