Initial implementation of importencoded
authorDiane Trout <diane@ghic.org>
Mon, 11 Apr 2016 22:48:33 +0000 (15:48 -0700)
committerDiane Trout <diane@ghic.org>
Mon, 11 Apr 2016 22:48:33 +0000 (15:48 -0700)
This version reads from a json dump file and dumps it
into postgresql using sqlalchemy.

Considering the rest of htsworkflow uses django it might
be better to switch to using django's ORM and also to take advantage
of requests.

encode_submission/importencoded.py [new file with mode: 0644]

diff --git a/encode_submission/importencoded.py b/encode_submission/importencoded.py
new file mode 100644 (file)
index 0000000..cb2bc14
--- /dev/null
@@ -0,0 +1,90 @@
+import argparse
+import logging
+import collections
+import json
+import pprint
+
+logger = logging.getLogger('ImportEncoded')
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine
+from sqlalchemy.dialects.postgresql import UUID, JSONB
+from sqlalchemy.orm import sessionmaker
+
+Base = declarative_base()
+
+class Item(Base):
+    __tablename__ = 'item'
+
+    uuid = Column(UUID, primary_key=True)
+    uri = Column(String)
+    object_type = Column(String)
+    payload = Column(JSONB)
+
+
+def create_item(row):
+    uuid = row['uuid']
+    uri = row['@id']
+    object_type = row['@type'][0]
+
+    payload = row.copy()
+    del payload['@id']
+    del payload['uuid']
+    del payload['@type']
+
+    return Item(uri=uri, uuid=uuid, object_type=object_type, payload=payload)
+
+
+def create_session(engine):
+    session = sessionmaker(bind=engine)
+    return session
+    
+def load_data(session, graph):
+    seen_pkeys = set()
+    duplicates = {}
+
+    for i, row in enumerate(graph):
+        obj_id = row['uuid']
+        if obj_id not in seen_pkeys:
+            session.add(create_item(row))
+            seen_pkeys.add(obj_id)
+        else:
+            duplicates.setdefault(obj_id, []).append(row)
+
+        if (i + 1) % 10000 == 0:
+            session.commit()
+            print("{} of {}".format(i+1, len(graph)))
+
+    return duplicates
+
+def load_dump(filename):
+    logger.info("Creating schema")
+    engine = create_engine('postgresql://felcat.caltech.edu/encoded')
+    Base.metadata.create_all(engine)
+    sessionfactory = sessionmaker(bind=engine)
+    session = sessionfactory()
+    
+    logger.info("Parsing %s", filename)
+    with open(filename, 'r') as instream:
+        data = json.load(instream)
+
+    graph = data['@graph']
+    logging.info("Loading")
+    collisions = load_data(session, graph)
+
+    with open('bad.txt', 'a') as outstream:
+        outstream.write(pprint.pformat(collisions))
+
+def main(cmdline=None):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('filename', nargs=1, help='json dump file to load')
+
+    args = parser.parse_args(cmdline)
+
+    logging.basicConfig(level=logging.DEBUG)
+    for filename in args.filename:
+        load_dump(filename)
+
+if __name__ == '__main__':
+    main()
+